1
0
mirror of synced 2025-12-25 02:09:19 -05:00

feat(source-snapchat-marketing): Add comprehensive mock server tests (#70300)

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: suisui.xia@airbyte.io <suisui.xia@airbyte.io>
This commit is contained in:
devin-ai-integration[bot]
2025-12-11 16:33:59 -08:00
committed by GitHub
parent deb011558a
commit c76ede22e9
34 changed files with 6942 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,53 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import os
import sys
from pathlib import Path
from pytest import fixture
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.state_builder import StateBuilder
pytest_plugins = ["airbyte_cdk.test.utils.manifest_only_fixtures"]
@fixture(autouse=True)
def mock_sleep(mocker):
mocker.patch("time.sleep")
def _get_manifest_path() -> Path:
source_declarative_manifest_path = Path("/airbyte/integration_code/source_declarative_manifest")
if source_declarative_manifest_path.exists():
return source_declarative_manifest_path
return Path(__file__).parent.parent
_SOURCE_FOLDER_PATH = _get_manifest_path()
_YAML_FILE_PATH = _SOURCE_FOLDER_PATH / "manifest.yaml"
sys.path.append(str(_SOURCE_FOLDER_PATH))
def get_source(config, state=None) -> YamlDeclarativeSource:
catalog = CatalogBuilder().build()
state = StateBuilder().build() if not state else state
return YamlDeclarativeSource(path_to_yaml=str(_YAML_FILE_PATH), catalog=catalog, config=config, state=state)
def find_stream(stream_name, config, state=None):
state = StateBuilder().build() if not state else state
streams = get_source(config, state).streams(config=config)
for stream in streams:
if stream.name == stream_name:
return stream
raise ValueError(f"Stream {stream_name} not found")
SNAPCHAT_API_URL = "https://adsapi.snapchat.com/v1"
OAUTH_TOKEN_URL = "https://accounts.snapchat.com/login/oauth2/access_token"

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,79 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations
from typing import Any, List, MutableMapping, Optional
CLIENT_ID = "test_client_id"
CLIENT_SECRET = "test_client_secret"
REFRESH_TOKEN = "test_refresh_token"
ACCESS_TOKEN = "test_access_token"
ORGANIZATION_ID = "test_org_123"
AD_ACCOUNT_ID = "test_adaccount_456"
CAMPAIGN_ID = "test_campaign_789"
ADSQUAD_ID = "test_adsquad_012"
AD_ID = "test_ad_345"
START_DATE = "2024-01-01"
END_DATE = "2024-01-31"
class ConfigBuilder:
def __init__(self) -> None:
self._config: MutableMapping[str, Any] = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"refresh_token": REFRESH_TOKEN,
"start_date": START_DATE,
"end_date": END_DATE,
"action_report_time": "conversion",
"swipe_up_attribution_window": "28_DAY",
"view_attribution_window": "1_DAY",
}
def with_client_id(self, client_id: str) -> "ConfigBuilder":
self._config["client_id"] = client_id
return self
def with_client_secret(self, client_secret: str) -> "ConfigBuilder":
self._config["client_secret"] = client_secret
return self
def with_refresh_token(self, refresh_token: str) -> "ConfigBuilder":
self._config["refresh_token"] = refresh_token
return self
def with_start_date(self, start_date: str) -> "ConfigBuilder":
self._config["start_date"] = start_date
return self
def with_end_date(self, end_date: str) -> "ConfigBuilder":
self._config["end_date"] = end_date
return self
def with_organization_ids(self, organization_ids: List[str]) -> "ConfigBuilder":
self._config["organization_ids"] = organization_ids
return self
def with_ad_account_ids(self, ad_account_ids: List[str]) -> "ConfigBuilder":
self._config["ad_account_ids"] = ad_account_ids
return self
def with_action_report_time(self, action_report_time: str) -> "ConfigBuilder":
self._config["action_report_time"] = action_report_time
return self
def with_swipe_up_attribution_window(self, window: str) -> "ConfigBuilder":
self._config["swipe_up_attribution_window"] = window
return self
def with_view_attribution_window(self, window: str) -> "ConfigBuilder":
self._config["view_attribution_window"] = window
return self
def build(self) -> MutableMapping[str, Any]:
return self._config

View File

@@ -0,0 +1,153 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations
from typing import Any, Dict, List, Optional, Union
from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS, HttpRequest
from .config import (
AD_ACCOUNT_ID,
AD_ID,
ADSQUAD_ID,
CAMPAIGN_ID,
ORGANIZATION_ID,
)
SNAPCHAT_API_URL = "https://adsapi.snapchat.com/v1"
OAUTH_TOKEN_URL = "https://accounts.snapchat.com/login/oauth2/access_token"
class OAuthRequestBuilder:
@classmethod
def oauth_endpoint(
cls,
client_id: str = "test_client_id",
client_secret: str = "test_client_secret",
refresh_token: str = "test_refresh_token",
) -> "OAuthRequestBuilder":
return cls(client_id, client_secret, refresh_token)
def __init__(
self,
client_id: str = "test_client_id",
client_secret: str = "test_client_secret",
refresh_token: str = "test_refresh_token",
) -> None:
self._body = f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
def build(self) -> HttpRequest:
return HttpRequest(
url=OAUTH_TOKEN_URL,
body=self._body,
)
class RequestBuilder:
@classmethod
def organizations_endpoint(cls, organization_id: str = "me") -> "RequestBuilder":
if organization_id == "me":
return cls(resource="me/organizations")
return cls(resource=f"organizations/{organization_id}")
@classmethod
def adaccounts_endpoint(cls, organization_id: str = ORGANIZATION_ID) -> "RequestBuilder":
return cls(resource=f"organizations/{organization_id}/adaccounts")
@classmethod
def adaccounts_by_id_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}")
@classmethod
def creatives_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/creatives")
@classmethod
def ads_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/ads")
@classmethod
def adsquads_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/adsquads")
@classmethod
def segments_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/segments")
@classmethod
def media_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/media")
@classmethod
def campaigns_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/campaigns")
@classmethod
def adaccounts_stats_endpoint(cls, ad_account_id: str = AD_ACCOUNT_ID) -> "RequestBuilder":
return cls(resource=f"adaccounts/{ad_account_id}/stats")
@classmethod
def ads_stats_endpoint(cls, ad_id: str = AD_ID) -> "RequestBuilder":
return cls(resource=f"ads/{ad_id}/stats")
@classmethod
def adsquads_stats_endpoint(cls, adsquad_id: str = ADSQUAD_ID) -> "RequestBuilder":
return cls(resource=f"adsquads/{adsquad_id}/stats")
@classmethod
def campaigns_stats_endpoint(cls, campaign_id: str = CAMPAIGN_ID) -> "RequestBuilder":
return cls(resource=f"campaigns/{campaign_id}/stats")
def __init__(self, resource: str = "") -> None:
self._resource = resource
self._query_params: Dict[str, Any] = {}
self._body = None
def with_query_param(self, key: str, value: Any) -> "RequestBuilder":
self._query_params[key] = value
return self
def with_granularity(self, granularity: str) -> "RequestBuilder":
self._query_params["granularity"] = granularity
return self
def with_fields(self, fields: str) -> "RequestBuilder":
self._query_params["fields"] = fields
return self
def with_start_time(self, start_time: str) -> "RequestBuilder":
self._query_params["start_time"] = start_time
return self
def with_end_time(self, end_time: str) -> "RequestBuilder":
self._query_params["end_time"] = end_time
return self
def with_action_report_time(self, action_report_time: str) -> "RequestBuilder":
self._query_params["action_report_time"] = action_report_time
return self
def with_view_attribution_window(self, window: str) -> "RequestBuilder":
self._query_params["view_attribution_window"] = window
return self
def with_swipe_up_attribution_window(self, window: str) -> "RequestBuilder":
self._query_params["swipe_up_attribution_window"] = window
return self
def with_any_query_params(self) -> "RequestBuilder":
self._any_query_params = True
return self
def build(self) -> HttpRequest:
query_params = (
ANY_QUERY_PARAMS if getattr(self, "_any_query_params", False) else (self._query_params if self._query_params else None)
)
return HttpRequest(
url=f"{SNAPCHAT_API_URL}/{self._resource}",
query_params=query_params,
body=self._body,
)

View File

@@ -0,0 +1,349 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import copy
import json
from http import HTTPStatus
from typing import Any, List, Optional
from airbyte_cdk.test.mock_http import HttpResponse
from airbyte_cdk.test.mock_http.response_builder import find_template
from .config import (
AD_ACCOUNT_ID,
AD_ID,
ADSQUAD_ID,
CAMPAIGN_ID,
ORGANIZATION_ID,
)
def _set_nested_value(obj: Any, key: str, value: Any) -> bool:
"""Recursively set a value in a nested structure."""
if isinstance(obj, dict):
if key in obj:
obj[key] = value
return True
for v in obj.values():
if _set_nested_value(v, key, value):
return True
elif isinstance(obj, list):
for item in obj:
if _set_nested_value(item, key, value):
return True
return False
def create_response(
resource_name: str,
status_code: int = 200,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create an HttpResponse from a JSON template file.
Args:
resource_name: Name of the JSON template file (without .json extension)
status_code: HTTP status code for the response
has_next: Whether to include pagination next_link
next_link: The URL for the next page
Returns:
HttpResponse with the template body
"""
body = copy.deepcopy(find_template(resource_name, __file__))
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=status_code)
def create_response_with_id(
resource_name: str,
record_id: str,
status_code: int = 200,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create an HttpResponse from a JSON template with a specific record ID."""
body = copy.deepcopy(find_template(resource_name, __file__))
_set_nested_value(body, "id", record_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=status_code)
def create_empty_response(resource_name: str) -> HttpResponse:
"""Create an empty response for a given resource."""
body = copy.deepcopy(find_template(resource_name, __file__))
for key in body:
if isinstance(body[key], list) and key not in ["request_status", "request_id"]:
body[key] = []
break
return HttpResponse(body=json.dumps(body), status_code=200)
def create_error_response(status_code: HTTPStatus = HTTPStatus.UNAUTHORIZED) -> HttpResponse:
"""Create an error response from a JSON template."""
error_template_map = {
HTTPStatus.UNAUTHORIZED: "error_401",
HTTPStatus.TOO_MANY_REQUESTS: "error_429",
}
template_name = error_template_map.get(status_code)
if template_name:
body = copy.deepcopy(find_template(template_name, __file__))
else:
body = {"request_status": "ERROR", "request_id": "test_request_id", "msg": f"Error {status_code.value}"}
return HttpResponse(body=json.dumps(body), status_code=status_code.value)
def create_oauth_response() -> HttpResponse:
"""Create an OAuth token response from JSON template."""
body = copy.deepcopy(find_template("oauth_token", __file__))
return HttpResponse(body=json.dumps(body), status_code=200)
def create_stats_response(
resource_name: str,
entity_id: str,
granularity: str = "HOUR",
status_code: int = 200,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a stats response with specific entity ID and granularity."""
body = copy.deepcopy(find_template(resource_name, __file__))
_set_nested_value(body, "id", entity_id)
_set_nested_value(body, "granularity", granularity)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=status_code)
def create_multiple_records_response(
resource_name: str,
record_ids: List[str],
status_code: int = 200,
) -> HttpResponse:
"""Create a response with multiple records for testing substreams with multiple parents."""
template = find_template(resource_name, __file__)
body = copy.deepcopy(template)
data_key = None
record_template = None
for key in body:
if isinstance(body[key], list) and key not in ["request_status", "request_id"]:
data_key = key
if body[key]:
record_template = copy.deepcopy(body[key][0])
break
if data_key and record_template:
body[data_key] = []
for record_id in record_ids:
record = copy.deepcopy(record_template)
_set_nested_value(record, "id", record_id)
body[data_key].append(record)
return HttpResponse(body=json.dumps(body), status_code=status_code)
# Legacy helper functions that wrap the new template-based functions
# These maintain backward compatibility with existing tests
def oauth_response() -> HttpResponse:
"""Create an OAuth token response."""
return create_oauth_response()
def organizations_response(
organization_id: str = ORGANIZATION_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create an organizations response using JSON template."""
return create_response_with_id("organizations", organization_id, has_next=has_next, next_link=next_link)
def adaccounts_response(
ad_account_id: str = AD_ACCOUNT_ID,
organization_id: str = ORGANIZATION_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create an adaccounts response using JSON template."""
body = copy.deepcopy(find_template("adaccounts", __file__))
_set_nested_value(body, "id", ad_account_id)
_set_nested_value(body, "organization_id", organization_id)
_set_nested_value(body, "advertiser_organization_id", organization_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def adaccounts_response_multiple(
ad_account_ids: List[str],
organization_id: str = ORGANIZATION_ID,
) -> HttpResponse:
"""Create response with multiple ad accounts for testing substreams with multiple parents."""
return create_multiple_records_response("adaccounts", ad_account_ids)
def organizations_response_multiple(
organization_ids: List[str],
) -> HttpResponse:
"""Create response with multiple organizations for testing substreams with multiple parents."""
return create_multiple_records_response("organizations", organization_ids)
def adsquads_response_multiple(
adsquad_ids: List[str],
) -> HttpResponse:
"""Create response with multiple adsquads for testing substreams with multiple parents."""
return create_multiple_records_response("adsquads", adsquad_ids)
def creatives_response(
creative_id: str = "test_creative_123",
ad_account_id: str = AD_ACCOUNT_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a creatives response using JSON template."""
body = copy.deepcopy(find_template("creatives", __file__))
_set_nested_value(body, "id", creative_id)
_set_nested_value(body, "ad_account_id", ad_account_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def ads_response(
ad_id: str = AD_ID,
ad_account_id: str = AD_ACCOUNT_ID,
adsquad_id: str = ADSQUAD_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create an ads response using JSON template."""
body = copy.deepcopy(find_template("ads", __file__))
_set_nested_value(body, "id", ad_id)
_set_nested_value(body, "ad_squad_id", adsquad_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def adsquads_response(
adsquad_id: str = ADSQUAD_ID,
ad_account_id: str = AD_ACCOUNT_ID,
campaign_id: str = CAMPAIGN_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create an adsquads response using JSON template."""
body = copy.deepcopy(find_template("adsquads", __file__))
_set_nested_value(body, "id", adsquad_id)
_set_nested_value(body, "campaign_id", campaign_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def segments_response(
segment_id: str = "test_segment_123",
ad_account_id: str = AD_ACCOUNT_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a segments response using JSON template."""
body = copy.deepcopy(find_template("segments", __file__))
_set_nested_value(body, "id", segment_id)
_set_nested_value(body, "ad_account_id", ad_account_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def media_response(
media_id: str = "test_media_123",
ad_account_id: str = AD_ACCOUNT_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a media response using JSON template."""
body = copy.deepcopy(find_template("media", __file__))
_set_nested_value(body, "id", media_id)
_set_nested_value(body, "ad_account_id", ad_account_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def campaigns_response(
campaign_id: str = CAMPAIGN_ID,
ad_account_id: str = AD_ACCOUNT_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a campaigns response using JSON template."""
body = copy.deepcopy(find_template("campaigns", __file__))
_set_nested_value(body, "id", campaign_id)
_set_nested_value(body, "ad_account_id", ad_account_id)
if has_next and next_link:
body["paging"] = {"next_link": next_link}
return HttpResponse(body=json.dumps(body), status_code=200)
def stats_timeseries_response(
entity_id: str = AD_ACCOUNT_ID,
granularity: str = "HOUR",
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a stats timeseries response using JSON template."""
return create_stats_response("stats_timeseries", entity_id, granularity, has_next=has_next, next_link=next_link)
def stats_lifetime_response(
entity_id: str = AD_ACCOUNT_ID,
has_next: bool = False,
next_link: Optional[str] = None,
) -> HttpResponse:
"""Create a stats lifetime response using JSON template."""
return create_stats_response("stats_lifetime", entity_id, "LIFETIME", has_next=has_next, next_link=next_link)
def error_response(status_code: HTTPStatus = HTTPStatus.UNAUTHORIZED) -> HttpResponse:
"""Create an error response using JSON template."""
return create_error_response(status_code)
def empty_response(stream_key: str = "organizations") -> HttpResponse:
"""Create an empty response for a given stream using JSON template."""
return create_empty_response(stream_key)

View File

@@ -0,0 +1,248 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.mock_http.request import HttpRequest
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
error_response,
oauth_response,
organizations_response,
organizations_response_multiple,
)
from .utils import config, read_output
_STREAM_NAME = "adaccounts"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestAdaccounts(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == AD_ACCOUNT_ID
@HttpMocker()
def test_read_records_with_pagination(self, http_mocker: HttpMocker) -> None:
"""Test pagination for adaccounts stream.
NOTE: This test covers pagination for ALL streams in this connector
because they all use the same DefaultPaginator with identical
CursorPagination strategy (cursor_value from response.paging.next_link,
stop_condition when next_link is empty). Writing separate pagination
tests for each stream would be redundant.
"""
next_link = "https://adsapi.snapchat.com/v1/organizations/test_org_123/adaccounts?cursor=page2"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id="adaccount_1", has_next=True, next_link=next_link),
)
http_mocker.get(
HttpRequest(url=next_link),
adaccounts_response(ad_account_id="adaccount_2", has_next=False),
)
output = _read(config_builder=config())
assert len(output.records) == 2
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == AD_ACCOUNT_ID
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestAdaccountsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The adaccounts stream uses SubstreamPartitionRouter with organizations as parent.
This test verifies that adaccounts are fetched for each parent organization.
"""
org_1 = "org_001"
org_2 = "org_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response_multiple([org_1, org_2]),
)
# Mock adaccounts endpoint for each parent organization
http_mocker.get(
RequestBuilder.adaccounts_endpoint(org_1).build(),
adaccounts_response(ad_account_id="adaccount_from_org_1", organization_id=org_1),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(org_2).build(),
adaccounts_response(ad_account_id="adaccount_from_org_2", organization_id=org_2),
)
output = _read(config_builder=config())
# Verify records from both parent organizations are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "adaccount_from_org_1" in record_ids
assert "adaccount_from_org_2" in record_ids
class TestAdaccountsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state.
This test validates:
- Connector accepts state from previous sync
- State is passed to both get_source() and read()
- Records are returned
- State advances to latest record's cursor value
"""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,272 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from .config import AD_ACCOUNT_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
error_response,
oauth_response,
organizations_response,
stats_lifetime_response,
stats_timeseries_response,
)
from .utils import config, read_output
def _read(
config_builder: ConfigBuilder,
stream_name: str,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=stream_name,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
def _setup_parent_mocks(http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
def _setup_parent_mocks_multiple_adaccounts(http_mocker: HttpMocker, ad_account_ids: List[str]) -> None:
"""Setup parent mocks with multiple ad accounts for testing substreams."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple(ad_account_ids=ad_account_ids, organization_id=ORGANIZATION_ID),
)
class TestAdaccountsStatsHourly(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ACCOUNT_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_hourly")
# Enhanced assertions
assert len(output.records) == 5 # 5 weekly time slices (Jan 1-31 with step: P1W)
record = output.records[0].record.data
assert record.get("id") == AD_ACCOUNT_ID, f"Expected id={AD_ACCOUNT_ID}, got {record.get('id')}"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
_setup_parent_mocks(http_mocker)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
[
error_response(HTTPStatus.FORBIDDEN),
stats_timeseries_response(entity_id=AD_ACCOUNT_ID, granularity="HOUR"),
],
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
class TestAdaccountsStatsDaily(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ACCOUNT_ID, granularity="DAY"),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_daily")
assert len(output.records) == 1 # Daily: step P1M = 1 monthly slice
record = output.records[0].record.data
assert record.get("id") == AD_ACCOUNT_ID, f"Expected id={AD_ACCOUNT_ID}, got {record.get('id')}"
class TestAdaccountsStatsLifetime(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
stats_lifetime_response(entity_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_lifetime")
assert len(output.records) == 1 # Lifetime: no step
record = output.records[0].record.data
assert record.get("id") == AD_ACCOUNT_ID, f"Expected id={AD_ACCOUNT_ID}, got {record.get('id')}"
class TestAdaccountsStatsTransformations(TestCase):
@HttpMocker()
def test_transformations_add_fields(self, http_mocker: HttpMocker) -> None:
"""Test that AddFields transformations are applied correctly.
The manifest defines these transformations for adaccounts_stats_hourly:
- AddFields: id (from stream_slice['id'])
- AddFields: type = AD_ACCOUNT
- AddFields: granularity = HOUR
- AddFields: spend (from record.get('stats', {}).get('spend'))
- RemoveFields: stats
"""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ACCOUNT_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
record = output.records[0].record.data
# Verify AddFields transformations
assert record.get("id") == AD_ACCOUNT_ID
assert record.get("type") == "AD_ACCOUNT"
assert record.get("granularity") == "HOUR"
# Verify spend field is extracted from stats
assert "spend" in record
# Verify RemoveFields transformation - stats should be removed
assert "stats" not in record
class TestAdaccountsStatsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The adaccounts_stats streams use SubstreamPartitionRouter with adaccounts as parent.
This test verifies that stats are fetched for each parent ad account.
"""
ad_account_1 = "adaccount_001"
ad_account_2 = "adaccount_002"
_setup_parent_mocks_multiple_adaccounts(http_mocker, [ad_account_1, ad_account_2])
# Mock stats endpoint for each parent ad account
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(ad_account_1).with_any_query_params().build(),
stats_timeseries_response(entity_id=ad_account_1, granularity="HOUR"),
)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(ad_account_2).with_any_query_params().build(),
stats_timeseries_response(entity_id=ad_account_2, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_hourly")
# Verify records from both parent ad accounts are returned
assert len(output.records) == 10 # 2 parents × 5 weekly time slices = 10 records
record_ids = [r.record.data.get("id") for r in output.records]
assert ad_account_1 in record_ids
assert ad_account_2 in record_ids
class TestAdaccountsStatsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ACCOUNT_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_hourly", sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 5 # 5 weekly time slices
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state for stats streams."""
from airbyte_cdk.test.state_builder import StateBuilder
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state("adaccounts_stats_hourly", {"start_time": previous_state_date}).build()
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adaccounts_stats_endpoint(AD_ACCOUNT_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ACCOUNT_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adaccounts_stats_hourly", sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 3 # 3 remaining weekly time slices after state date (Jan 15-31)
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,230 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, AD_ID, ADSQUAD_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
ads_response,
error_response,
oauth_response,
organizations_response,
)
from .utils import config, read_output
_STREAM_NAME = "ads"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestAds(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.ads_endpoint(AD_ACCOUNT_ID).build(),
ads_response(ad_id=AD_ID, ad_account_id=AD_ACCOUNT_ID, adsquad_id=ADSQUAD_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == AD_ID
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.ads_endpoint(AD_ACCOUNT_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
ads_response(ad_id=AD_ID, ad_account_id=AD_ACCOUNT_ID, adsquad_id=ADSQUAD_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == AD_ID
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestAdsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The ads stream uses SubstreamPartitionRouter with adaccounts as parent.
This test verifies that ads are fetched for each parent adaccount.
"""
adaccount_1 = "adaccount_001"
adaccount_2 = "adaccount_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple([adaccount_1, adaccount_2]),
)
# Mock ads endpoint for each parent adaccount
http_mocker.get(
RequestBuilder.ads_endpoint(adaccount_1).build(),
ads_response(ad_id="ad_from_adaccount_1", ad_account_id=adaccount_1),
)
http_mocker.get(
RequestBuilder.ads_endpoint(adaccount_2).build(),
ads_response(ad_id="ad_from_adaccount_2", ad_account_id=adaccount_2),
)
output = _read(config_builder=config())
# Verify records from both parent adaccounts are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "ad_from_adaccount_1" in record_ids
assert "ad_from_adaccount_2" in record_ids
class TestAdsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.ads_endpoint(AD_ACCOUNT_ID).build(),
ads_response(ad_id=AD_ID, ad_account_id=AD_ACCOUNT_ID, adsquad_id=ADSQUAD_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.ads_endpoint(AD_ACCOUNT_ID).build(),
ads_response(ad_id=AD_ID, ad_account_id=AD_ACCOUNT_ID, adsquad_id=ADSQUAD_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,280 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, AD_ID, ADSQUAD_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
ads_response,
create_multiple_records_response,
error_response,
oauth_response,
organizations_response,
stats_lifetime_response,
stats_timeseries_response,
)
from .utils import config, read_output
def _read(
config_builder: ConfigBuilder,
stream_name: str,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=stream_name,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
def _setup_parent_mocks(http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.ads_endpoint(AD_ACCOUNT_ID).build(),
ads_response(ad_id=AD_ID, ad_account_id=AD_ACCOUNT_ID, adsquad_id=ADSQUAD_ID),
)
def _setup_parent_mocks_multiple_ads(http_mocker: HttpMocker, ad_ids: List[str]) -> None:
"""Setup parent mocks with multiple ads for testing substreams."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.ads_endpoint(AD_ACCOUNT_ID).build(),
create_multiple_records_response("ads", ad_ids),
)
class TestAdsStatsHourly(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="ads_stats_hourly")
# Enhanced assertions
assert len(output.records) == 5 # 5 weekly time slices (Jan 1-31 with step: P1W)
record = output.records[0].record.data
assert record.get("id") == AD_ID, f"Expected id={AD_ID}, got {record.get('id')}"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
_setup_parent_mocks(http_mocker)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
[
error_response(HTTPStatus.FORBIDDEN),
stats_timeseries_response(entity_id=AD_ID, granularity="HOUR"),
],
)
output = _read(config_builder=config(), stream_name="ads_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
class TestAdsStatsDaily(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ID, granularity="DAY"),
)
output = _read(config_builder=config(), stream_name="ads_stats_daily")
assert len(output.records) == 1 # Daily: step P1M = 1 monthly slice
record = output.records[0].record.data
assert record.get("id") == AD_ID, f"Expected id={AD_ID}, got {record.get('id')}"
class TestAdsStatsLifetime(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
stats_lifetime_response(entity_id=AD_ID),
)
output = _read(config_builder=config(), stream_name="ads_stats_lifetime")
assert len(output.records) == 1 # Lifetime: no step
record = output.records[0].record.data
assert record.get("id") == AD_ID, f"Expected id={AD_ID}, got {record.get('id')}"
class TestAdsStatsTransformations(TestCase):
@HttpMocker()
def test_transformations_add_fields(self, http_mocker: HttpMocker) -> None:
"""Test that AddFields transformations are applied correctly.
The manifest defines these transformations for ads_stats_hourly:
- AddFields: id (from stream_slice['id'])
- AddFields: type = AD
- AddFields: granularity = HOUR
- AddFields: spend (from record.get('stats', {}).get('spend'))
- RemoveFields: stats
"""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="ads_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
record = output.records[0].record.data
# Verify AddFields transformations
assert record.get("id") == AD_ID
assert record.get("type") == "AD"
assert record.get("granularity") == "HOUR"
# Verify spend field is extracted from stats
assert "spend" in record
# Verify RemoveFields transformation - stats should be removed
assert "stats" not in record
class TestAdsStatsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The ads_stats streams use SubstreamPartitionRouter with ads as parent.
This test verifies that stats are fetched for each parent ad.
"""
ad_1 = "ad_001"
ad_2 = "ad_002"
_setup_parent_mocks_multiple_ads(http_mocker, [ad_1, ad_2])
# Mock stats endpoint for each parent ad
http_mocker.get(
RequestBuilder.ads_stats_endpoint(ad_1).with_any_query_params().build(),
stats_timeseries_response(entity_id=ad_1, granularity="HOUR"),
)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(ad_2).with_any_query_params().build(),
stats_timeseries_response(entity_id=ad_2, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="ads_stats_hourly")
# Verify records from both parent ads are returned
assert len(output.records) == 10 # 2 parents × 5 weekly time slices = 10 records
record_ids = [r.record.data.get("id") for r in output.records]
assert ad_1 in record_ids
assert ad_2 in record_ids
class TestAdsStatsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="ads_stats_hourly", sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 5 # 5 weekly time slices
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state for stats streams."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state("ads_stats_hourly", {"start_time": previous_state_date}).build()
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.ads_stats_endpoint(AD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=AD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="ads_stats_hourly", sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 3 # 3 remaining weekly time slices after state date (Jan 15-31)
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,230 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, ADSQUAD_ID, CAMPAIGN_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
adsquads_response,
error_response,
oauth_response,
organizations_response,
)
from .utils import config, read_output
_STREAM_NAME = "adsquads"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestAdsquads(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adsquads_endpoint(AD_ACCOUNT_ID).build(),
adsquads_response(adsquad_id=ADSQUAD_ID, ad_account_id=AD_ACCOUNT_ID, campaign_id=CAMPAIGN_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == ADSQUAD_ID
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.adsquads_endpoint(AD_ACCOUNT_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
adsquads_response(adsquad_id=ADSQUAD_ID, ad_account_id=AD_ACCOUNT_ID, campaign_id=CAMPAIGN_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == ADSQUAD_ID
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestAdsquadsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The adsquads stream uses SubstreamPartitionRouter with adaccounts as parent.
This test verifies that adsquads are fetched for each parent adaccount.
"""
adaccount_1 = "adaccount_001"
adaccount_2 = "adaccount_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple([adaccount_1, adaccount_2]),
)
# Mock adsquads endpoint for each parent adaccount
http_mocker.get(
RequestBuilder.adsquads_endpoint(adaccount_1).build(),
adsquads_response(adsquad_id="adsquad_from_adaccount_1", ad_account_id=adaccount_1),
)
http_mocker.get(
RequestBuilder.adsquads_endpoint(adaccount_2).build(),
adsquads_response(adsquad_id="adsquad_from_adaccount_2", ad_account_id=adaccount_2),
)
output = _read(config_builder=config())
# Verify records from both parent adaccounts are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "adsquad_from_adaccount_1" in record_ids
assert "adsquad_from_adaccount_2" in record_ids
class TestAdsquadsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adsquads_endpoint(AD_ACCOUNT_ID).build(),
adsquads_response(adsquad_id=ADSQUAD_ID, ad_account_id=AD_ACCOUNT_ID, campaign_id=CAMPAIGN_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adsquads_endpoint(AD_ACCOUNT_ID).build(),
adsquads_response(adsquad_id=ADSQUAD_ID, ad_account_id=AD_ACCOUNT_ID, campaign_id=CAMPAIGN_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,280 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, ADSQUAD_ID, CAMPAIGN_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adsquads_response,
create_multiple_records_response,
error_response,
oauth_response,
organizations_response,
stats_lifetime_response,
stats_timeseries_response,
)
from .utils import config, read_output
def _read(
config_builder: ConfigBuilder,
stream_name: str,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=stream_name,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
def _setup_parent_mocks(http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adsquads_endpoint(AD_ACCOUNT_ID).build(),
adsquads_response(adsquad_id=ADSQUAD_ID, ad_account_id=AD_ACCOUNT_ID, campaign_id=CAMPAIGN_ID),
)
def _setup_parent_mocks_multiple_adsquads(http_mocker: HttpMocker, adsquad_ids: List[str]) -> None:
"""Setup parent mocks with multiple adsquads for testing substreams."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adsquads_endpoint(AD_ACCOUNT_ID).build(),
create_multiple_records_response("adsquads", adsquad_ids),
)
class TestAdsquadsStatsHourly(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=ADSQUAD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_hourly")
# Enhanced assertions
assert len(output.records) == 5 # 5 weekly time slices (Jan 1-31 with step: P1W)
record = output.records[0].record.data
assert record.get("id") == ADSQUAD_ID, f"Expected id={ADSQUAD_ID}, got {record.get('id')}"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
_setup_parent_mocks(http_mocker)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
[
error_response(HTTPStatus.FORBIDDEN),
stats_timeseries_response(entity_id=ADSQUAD_ID, granularity="HOUR"),
],
)
output = _read(config_builder=config(), stream_name="adsquads_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
class TestAdsquadsStatsDaily(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=ADSQUAD_ID, granularity="DAY"),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_daily")
assert len(output.records) == 1 # Daily: step P1M = 1 monthly slice
record = output.records[0].record.data
assert record.get("id") == ADSQUAD_ID, f"Expected id={ADSQUAD_ID}, got {record.get('id')}"
class TestAdsquadsStatsLifetime(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
stats_lifetime_response(entity_id=ADSQUAD_ID),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_lifetime")
assert len(output.records) == 1 # Lifetime: no step
record = output.records[0].record.data
assert record.get("id") == ADSQUAD_ID, f"Expected id={ADSQUAD_ID}, got {record.get('id')}"
class TestAdsquadsStatsTransformations(TestCase):
@HttpMocker()
def test_transformations_add_fields(self, http_mocker: HttpMocker) -> None:
"""Test that AddFields transformations are applied correctly.
The manifest defines these transformations for adsquads_stats_hourly:
- AddFields: id (from stream_slice['id'])
- AddFields: type = AD_SQUAD
- AddFields: granularity = HOUR
- AddFields: spend (from record.get('stats', {}).get('spend'))
- RemoveFields: stats
"""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=ADSQUAD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
record = output.records[0].record.data
# Verify AddFields transformations
assert record.get("id") == ADSQUAD_ID
assert record.get("type") == "AD_SQUAD"
assert record.get("granularity") == "HOUR"
# Verify spend field is extracted from stats
assert "spend" in record
# Verify RemoveFields transformation - stats should be removed
assert "stats" not in record
class TestAdsquadsStatsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The adsquads_stats streams use SubstreamPartitionRouter with adsquads as parent.
This test verifies that stats are fetched for each parent adsquad.
"""
adsquad_1 = "adsquad_001"
adsquad_2 = "adsquad_002"
_setup_parent_mocks_multiple_adsquads(http_mocker, [adsquad_1, adsquad_2])
# Mock stats endpoint for each parent adsquad
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(adsquad_1).with_any_query_params().build(),
stats_timeseries_response(entity_id=adsquad_1, granularity="HOUR"),
)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(adsquad_2).with_any_query_params().build(),
stats_timeseries_response(entity_id=adsquad_2, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_hourly")
# Verify records from both parent adsquads are returned
assert len(output.records) == 10 # 2 parents × 5 weekly time slices = 10 records
record_ids = [r.record.data.get("id") for r in output.records]
assert adsquad_1 in record_ids
assert adsquad_2 in record_ids
class TestAdsquadsStatsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=ADSQUAD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_hourly", sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 5 # 5 weekly time slices
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state for stats streams."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state("adsquads_stats_hourly", {"start_time": previous_state_date}).build()
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.adsquads_stats_endpoint(ADSQUAD_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=ADSQUAD_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="adsquads_stats_hourly", sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 3 # 3 remaining weekly time slices after state date (Jan 15-31)
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,230 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, CAMPAIGN_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
campaigns_response,
error_response,
oauth_response,
organizations_response,
)
from .utils import config, read_output
_STREAM_NAME = "campaigns"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestCampaigns(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.campaigns_endpoint(AD_ACCOUNT_ID).build(),
campaigns_response(campaign_id=CAMPAIGN_ID, ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == CAMPAIGN_ID
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.campaigns_endpoint(AD_ACCOUNT_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
campaigns_response(campaign_id=CAMPAIGN_ID, ad_account_id=AD_ACCOUNT_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == CAMPAIGN_ID
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestCampaignsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The campaigns stream uses SubstreamPartitionRouter with adaccounts as parent.
This test verifies that campaigns are fetched for each parent adaccount.
"""
adaccount_1 = "adaccount_001"
adaccount_2 = "adaccount_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple([adaccount_1, adaccount_2]),
)
# Mock campaigns endpoint for each parent adaccount
http_mocker.get(
RequestBuilder.campaigns_endpoint(adaccount_1).build(),
campaigns_response(campaign_id="campaign_from_adaccount_1", ad_account_id=adaccount_1),
)
http_mocker.get(
RequestBuilder.campaigns_endpoint(adaccount_2).build(),
campaigns_response(campaign_id="campaign_from_adaccount_2", ad_account_id=adaccount_2),
)
output = _read(config_builder=config())
# Verify records from both parent adaccounts are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "campaign_from_adaccount_1" in record_ids
assert "campaign_from_adaccount_2" in record_ids
class TestCampaignsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.campaigns_endpoint(AD_ACCOUNT_ID).build(),
campaigns_response(campaign_id=CAMPAIGN_ID, ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.campaigns_endpoint(AD_ACCOUNT_ID).build(),
campaigns_response(campaign_id=CAMPAIGN_ID, ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,280 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, CAMPAIGN_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
campaigns_response,
create_multiple_records_response,
error_response,
oauth_response,
organizations_response,
stats_lifetime_response,
stats_timeseries_response,
)
from .utils import config, read_output
def _read(
config_builder: ConfigBuilder,
stream_name: str,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=stream_name,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
def _setup_parent_mocks(http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.campaigns_endpoint(AD_ACCOUNT_ID).build(),
campaigns_response(campaign_id=CAMPAIGN_ID, ad_account_id=AD_ACCOUNT_ID),
)
def _setup_parent_mocks_multiple_campaigns(http_mocker: HttpMocker, campaign_ids: List[str]) -> None:
"""Setup parent mocks with multiple campaigns for testing substreams."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.campaigns_endpoint(AD_ACCOUNT_ID).build(),
create_multiple_records_response("campaigns", campaign_ids),
)
class TestCampaignsStatsHourly(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=CAMPAIGN_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_hourly")
# Enhanced assertions
assert len(output.records) == 5 # 5 weekly time slices (Jan 1-31 with step: P1W)
record = output.records[0].record.data
assert record.get("id") == CAMPAIGN_ID, f"Expected id={CAMPAIGN_ID}, got {record.get('id')}"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
_setup_parent_mocks(http_mocker)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
[
error_response(HTTPStatus.FORBIDDEN),
stats_timeseries_response(entity_id=CAMPAIGN_ID, granularity="HOUR"),
],
)
output = _read(config_builder=config(), stream_name="campaigns_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
class TestCampaignsStatsDaily(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=CAMPAIGN_ID, granularity="DAY"),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_daily")
assert len(output.records) == 1 # Daily: step P1M = 1 monthly slice
record = output.records[0].record.data
assert record.get("id") == CAMPAIGN_ID, f"Expected id={CAMPAIGN_ID}, got {record.get('id')}"
class TestCampaignsStatsLifetime(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
stats_lifetime_response(entity_id=CAMPAIGN_ID),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_lifetime")
assert len(output.records) == 1 # Lifetime: no step
record = output.records[0].record.data
assert record.get("id") == CAMPAIGN_ID, f"Expected id={CAMPAIGN_ID}, got {record.get('id')}"
class TestCampaignsStatsTransformations(TestCase):
@HttpMocker()
def test_transformations_add_fields(self, http_mocker: HttpMocker) -> None:
"""Test that AddFields transformations are applied correctly.
The manifest defines these transformations for campaigns_stats_hourly:
- AddFields: id (from stream_slice['id'])
- AddFields: type = CAMPAIGN
- AddFields: granularity = HOUR
- AddFields: spend (from record.get('stats', {}).get('spend'))
- RemoveFields: stats
"""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=CAMPAIGN_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_hourly")
assert len(output.records) == 5 # 5 weekly time slices
record = output.records[0].record.data
# Verify AddFields transformations
assert record.get("id") == CAMPAIGN_ID
assert record.get("type") == "CAMPAIGN"
assert record.get("granularity") == "HOUR"
# Verify spend field is extracted from stats
assert "spend" in record
# Verify RemoveFields transformation - stats should be removed
assert "stats" not in record
class TestCampaignsStatsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The campaigns_stats streams use SubstreamPartitionRouter with campaigns as parent.
This test verifies that stats are fetched for each parent campaign.
"""
campaign_1 = "campaign_001"
campaign_2 = "campaign_002"
_setup_parent_mocks_multiple_campaigns(http_mocker, [campaign_1, campaign_2])
# Mock stats endpoint for each parent campaign
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(campaign_1).with_any_query_params().build(),
stats_timeseries_response(entity_id=campaign_1, granularity="HOUR"),
)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(campaign_2).with_any_query_params().build(),
stats_timeseries_response(entity_id=campaign_2, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_hourly")
# Verify records from both parent campaigns are returned
assert len(output.records) == 10 # 2 parents × 5 weekly time slices = 10 records
record_ids = [r.record.data.get("id") for r in output.records]
assert campaign_1 in record_ids
assert campaign_2 in record_ids
class TestCampaignsStatsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=CAMPAIGN_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_hourly", sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 5 # 5 weekly time slices
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state for stats streams."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state("campaigns_stats_hourly", {"start_time": previous_state_date}).build()
_setup_parent_mocks(http_mocker)
http_mocker.get(
RequestBuilder.campaigns_stats_endpoint(CAMPAIGN_ID).with_any_query_params().build(),
stats_timeseries_response(entity_id=CAMPAIGN_ID, granularity="HOUR"),
)
output = _read(config_builder=config(), stream_name="campaigns_stats_hourly", sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 3 # 3 remaining weekly time slices after state date (Jan 15-31)
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("start_time")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("start_time") or new_state.get("state", {}).get("start_time")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'start_time' in state"
assert record_cursor_value is not None, "Expected 'start_time' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,230 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
creatives_response,
error_response,
oauth_response,
organizations_response,
)
from .utils import config, read_output
_STREAM_NAME = "creatives"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestCreatives(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.creatives_endpoint(AD_ACCOUNT_ID).build(),
creatives_response(creative_id="test_creative_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == "test_creative_123"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.creatives_endpoint(AD_ACCOUNT_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
creatives_response(creative_id="test_creative_123", ad_account_id=AD_ACCOUNT_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == "test_creative_123"
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestCreativesSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The creatives stream uses SubstreamPartitionRouter with adaccounts as parent.
This test verifies that creatives are fetched for each parent adaccount.
"""
adaccount_1 = "adaccount_001"
adaccount_2 = "adaccount_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple([adaccount_1, adaccount_2]),
)
# Mock creatives endpoint for each parent adaccount
http_mocker.get(
RequestBuilder.creatives_endpoint(adaccount_1).build(),
creatives_response(creative_id="creative_from_adaccount_1", ad_account_id=adaccount_1),
)
http_mocker.get(
RequestBuilder.creatives_endpoint(adaccount_2).build(),
creatives_response(creative_id="creative_from_adaccount_2", ad_account_id=adaccount_2),
)
output = _read(config_builder=config())
# Verify records from both parent adaccounts are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "creative_from_adaccount_1" in record_ids
assert "creative_from_adaccount_2" in record_ids
class TestCreativesIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.creatives_endpoint(AD_ACCOUNT_ID).build(),
creatives_response(creative_id="test_creative_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.creatives_endpoint(AD_ACCOUNT_ID).build(),
creatives_response(creative_id="test_creative_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,230 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
error_response,
media_response,
oauth_response,
organizations_response,
)
from .utils import config, read_output
_STREAM_NAME = "media"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestMedia(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.media_endpoint(AD_ACCOUNT_ID).build(),
media_response(media_id="test_media_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == "test_media_123"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.media_endpoint(AD_ACCOUNT_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
media_response(media_id="test_media_123", ad_account_id=AD_ACCOUNT_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == "test_media_123"
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestMediaSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The media stream uses SubstreamPartitionRouter with adaccounts as parent.
This test verifies that media are fetched for each parent adaccount.
"""
adaccount_1 = "adaccount_001"
adaccount_2 = "adaccount_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple([adaccount_1, adaccount_2]),
)
# Mock media endpoint for each parent adaccount
http_mocker.get(
RequestBuilder.media_endpoint(adaccount_1).build(),
media_response(media_id="media_from_adaccount_1", ad_account_id=adaccount_1),
)
http_mocker.get(
RequestBuilder.media_endpoint(adaccount_2).build(),
media_response(media_id="media_from_adaccount_2", ad_account_id=adaccount_2),
)
output = _read(config_builder=config())
# Verify records from both parent adaccounts are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "media_from_adaccount_1" in record_ids
assert "media_from_adaccount_2" in record_ids
class TestMediaIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.media_endpoint(AD_ACCOUNT_ID).build(),
media_response(media_id="test_media_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.media_endpoint(AD_ACCOUNT_ID).build(),
media_response(media_id="test_media_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,198 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from datetime import datetime
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
create_empty_response,
error_response,
oauth_response,
organizations_response,
)
from .utils import config, read_output
_STREAM_NAME = "organizations"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestOrganizations(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == ORGANIZATION_ID
@HttpMocker()
def test_read_records_with_organization_ids(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint(ORGANIZATION_ID).build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config().with_organization_ids([ORGANIZATION_ID]))
assert len(output.records) == 1
assert output.records[0].record.data["id"] == ORGANIZATION_ID
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
[
error_response(HTTPStatus.FORBIDDEN),
organizations_response(organization_id=ORGANIZATION_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == ORGANIZATION_ID
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestOrganizationsEmptyResults(TestCase):
@HttpMocker()
def test_empty_results(self, http_mocker: HttpMocker) -> None:
"""Test handling of 0-record responses from API (GAP 2)."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
create_empty_response("organizations"),
)
output = _read(config_builder=config())
assert len(output.records) == 0
assert len(output.errors) == 0
# Verify sync completed successfully
log_messages = [log.log.message for log in output.logs]
assert any("Finished syncing" in msg or "Read" in msg for msg in log_messages)
class TestOrganizationsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state.
This test validates:
- Connector accepts state from previous sync
- State is passed to both get_source() and read()
- Records are returned
- State advances to latest record's cursor value
"""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,230 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from typing import List, Optional
from unittest import TestCase
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from .config import AD_ACCOUNT_ID, ORGANIZATION_ID, ConfigBuilder
from .request_builder import OAuthRequestBuilder, RequestBuilder
from .response_builder import (
adaccounts_response,
adaccounts_response_multiple,
error_response,
oauth_response,
organizations_response,
segments_response,
)
from .utils import config, read_output
_STREAM_NAME = "segments"
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode = SyncMode.full_refresh,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_builder,
stream_name=_STREAM_NAME,
sync_mode=sync_mode,
state=state,
expecting_exception=expecting_exception,
)
class TestSegments(TestCase):
@HttpMocker()
def test_read_records(self, http_mocker: HttpMocker) -> None:
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.segments_endpoint(AD_ACCOUNT_ID).build(),
segments_response(segment_id="test_segment_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == "test_segment_123"
@HttpMocker()
def test_read_records_with_error_403_retry(self, http_mocker: HttpMocker) -> None:
"""Test that 403 errors trigger RETRY behavior with custom error message from manifest."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
# First request returns 403, then succeeds on retry
http_mocker.get(
RequestBuilder.segments_endpoint(AD_ACCOUNT_ID).build(),
[
error_response(HTTPStatus.FORBIDDEN),
segments_response(segment_id="test_segment_123", ad_account_id=AD_ACCOUNT_ID),
],
)
output = _read(config_builder=config())
assert len(output.records) == 1
assert output.records[0].record.data["id"] == "test_segment_123"
# Verify custom error message from manifest is logged
log_messages = [log.log.message for log in output.logs]
expected_error_prefix = "Got permission error when accessing URL. Skipping"
assert any(
expected_error_prefix in msg for msg in log_messages
), f"Expected custom 403 error message '{expected_error_prefix}' in logs"
assert any(_STREAM_NAME in msg for msg in log_messages), f"Expected stream name '{_STREAM_NAME}' in log messages"
class TestSegmentsSubstreamMultipleParents(TestCase):
@HttpMocker()
def test_substream_with_two_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test that substream correctly processes multiple parent records.
The segments stream uses SubstreamPartitionRouter with adaccounts as parent.
This test verifies that segments are fetched for each parent adaccount.
"""
adaccount_1 = "adaccount_001"
adaccount_2 = "adaccount_002"
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response_multiple([adaccount_1, adaccount_2]),
)
# Mock segments endpoint for each parent adaccount
http_mocker.get(
RequestBuilder.segments_endpoint(adaccount_1).build(),
segments_response(segment_id="segment_from_adaccount_1", ad_account_id=adaccount_1),
)
http_mocker.get(
RequestBuilder.segments_endpoint(adaccount_2).build(),
segments_response(segment_id="segment_from_adaccount_2", ad_account_id=adaccount_2),
)
output = _read(config_builder=config())
# Verify records from both parent adaccounts are returned
assert len(output.records) == 2
record_ids = [r.record.data.get("id") for r in output.records]
assert "segment_from_adaccount_1" in record_ids
assert "segment_from_adaccount_2" in record_ids
class TestSegmentsIncremental(TestCase):
@HttpMocker()
def test_incremental_first_sync_emits_state(self, http_mocker: HttpMocker) -> None:
"""Test that first sync (no state) emits state message with cursor value."""
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.segments_endpoint(AD_ACCOUNT_ID).build(),
segments_response(segment_id="test_segment_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"
@HttpMocker()
def test_incremental_sync_with_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with previous state."""
previous_state_date = "2024-01-15T00:00:00.000000Z"
state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated_at": previous_state_date}).build()
http_mocker.post(
OAuthRequestBuilder.oauth_endpoint().build(),
oauth_response(),
)
http_mocker.get(
RequestBuilder.organizations_endpoint("me").build(),
organizations_response(organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.adaccounts_endpoint(ORGANIZATION_ID).build(),
adaccounts_response(ad_account_id=AD_ACCOUNT_ID, organization_id=ORGANIZATION_ID),
)
http_mocker.get(
RequestBuilder.segments_endpoint(AD_ACCOUNT_ID).build(),
segments_response(segment_id="test_segment_123", ad_account_id=AD_ACCOUNT_ID),
)
output = _read(config_builder=config(), sync_mode=SyncMode.incremental, state=state)
assert len(output.state_messages) > 0, "Expected state messages to be emitted"
assert len(output.records) == 1
# Get latest record's cursor
latest_record = output.records[-1].record.data
record_cursor_value = latest_record.get("updated_at")
# Get state cursor
new_state = output.most_recent_state.stream_state.__dict__
state_cursor_value = new_state.get("updated_at") or new_state.get("state", {}).get("updated_at")
# Validate state matches record
assert state_cursor_value is not None, "Expected 'updated_at' in state"
assert record_cursor_value is not None, "Expected 'updated_at' in record"
assert state_cursor_value == record_cursor_value or state_cursor_value.startswith(
record_cursor_value[:10]
), f"Expected state to match latest record. State: {state_cursor_value}, Record: {record_cursor_value}"

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from typing import List, Optional
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from ..conftest import get_source
from .config import ConfigBuilder
def catalog(stream_name: str, sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(stream_name, sync_mode).build()
def config() -> ConfigBuilder:
return ConfigBuilder()
def read_output(
config_builder: ConfigBuilder,
stream_name: str,
sync_mode: SyncMode,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: Optional[bool] = False,
) -> EntrypointOutput:
_catalog = catalog(stream_name, sync_mode)
_config = config_builder.build()
# Pass state to BOTH get_source() and read() for proper incremental sync behavior
source = get_source(config=_config, state=state)
return read(source, _config, _catalog, state, expecting_exception)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,23 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "source-snapchat-marketing"
version = "0.0.0"
description = "Unit tests for source-snapchat-marketing"
authors = ["Airbyte <contact@airbyte.io>"]
[tool.poetry.dependencies]
python = "^3.10,<3.13"
airbyte-cdk = "^6"
pytest = "^8"
freezegun = "^1.4.0"
pytest-mock = "^3.6.1"
requests-mock = "^1.12.1"
mock = "^5.1.0"
[tool.pytest.ini_options]
filterwarnings = [
"ignore:This class is experimental*"
]

View File

@@ -0,0 +1,27 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"adaccounts": [
{
"sub_request_status": "SUCCESS",
"adaccount": {
"id": "test_adaccount_456",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Ad Account",
"type": "PARTNER",
"status": "ACTIVE",
"organization_id": "test_org_123",
"currency": "USD",
"timezone": "America/Los_Angeles",
"advertiser_organization_id": "test_org_123",
"advertiser": "Test Advertiser",
"billing_type": "IO",
"billing_center_id": "test_billing_center",
"lifetime_spend_cap_micro": 0,
"agency_representing_client": false,
"client_paying_invoices": false
}
}
]
}

View File

@@ -0,0 +1,22 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"ads": [
{
"sub_request_status": "SUCCESS",
"ad": {
"id": "test_ad_345",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Ad",
"ad_squad_id": "test_adsquad_012",
"creative_id": "test_creative_123",
"status": "ACTIVE",
"type": "SNAP_AD",
"render_type": "STATIC",
"review_status": "APPROVED",
"review_status_reasons": []
}
}
]
}

View File

@@ -0,0 +1,27 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"adsquads": [
{
"sub_request_status": "SUCCESS",
"adsquad": {
"id": "test_adsquad_012",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Ad Squad",
"status": "ACTIVE",
"campaign_id": "test_campaign_789",
"type": "SNAP_ADS",
"targeting": {},
"targeting_reach_status": "VALID",
"placement": "SNAP_ADS",
"billing_event": "IMPRESSION",
"auto_bid": true,
"bid_strategy": "AUTO_BID",
"daily_budget_micro": 50000000,
"start_time": "2024-01-01T00:00:00.000Z",
"optimization_goal": "IMPRESSIONS"
}
}
]
}

View File

@@ -0,0 +1,24 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"campaigns": [
{
"sub_request_status": "SUCCESS",
"campaign": {
"id": "test_campaign_789",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Campaign",
"ad_account_id": "test_adaccount_456",
"status": "ACTIVE",
"objective": "AWARENESS",
"start_time": "2024-01-01T00:00:00.000Z",
"end_time": "2024-12-31T23:59:59.000Z",
"daily_budget_micro": 100000000,
"lifetime_spend_cap_micro": 0,
"buy_model": "AUCTION",
"regulations": {}
}
}
]
}

View File

@@ -0,0 +1,25 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"creatives": [
{
"sub_request_status": "SUCCESS",
"creative": {
"id": "test_creative_123",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Creative",
"ad_account_id": "test_adaccount_456",
"type": "SNAP_AD",
"packaging_status": "SUCCESS",
"review_status": "APPROVED",
"shareable": true,
"headline": "Test Headline",
"brand_name": "Test Brand",
"call_to_action": "INSTALL_NOW",
"top_snap_media_id": "test_media_id",
"top_snap_crop_position": "MIDDLE"
}
}
]
}

View File

@@ -0,0 +1,5 @@
{
"request_status": "ERROR",
"request_id": "test_request_id",
"msg": "Unauthorized"
}

View File

@@ -0,0 +1,5 @@
{
"request_status": "ERROR",
"request_id": "test_request_id",
"msg": "Rate limit exceeded"
}

View File

@@ -0,0 +1,21 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"media": [
{
"sub_request_status": "SUCCESS",
"media": {
"id": "test_media_123",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Media",
"ad_account_id": "test_adaccount_456",
"type": "VIDEO",
"media_status": "READY",
"file_name": "test_video.mp4",
"download_link": "https://example.com/test_video.mp4",
"duration_secs": 10.5
}
}
]
}

View File

@@ -0,0 +1,7 @@
{
"access_token": "test_access_token",
"token_type": "Bearer",
"expires_in": 1800,
"refresh_token": "test_refresh_token",
"scope": "snapchat-marketing-api"
}

View File

@@ -0,0 +1,28 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"organizations": [
{
"sub_request_status": "SUCCESS",
"organization": {
"id": "test_org_123",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Organization",
"address_line_1": "123 Test St",
"locality": "Test City",
"administrative_district_level_1": "CA",
"country": "US",
"postal_code": "12345",
"type": "ENTERPRISE",
"state": "ACTIVE",
"configuration_settings": {},
"accepted_term_version": "1",
"contact_name": "Test Contact",
"contact_email": "test@example.com",
"contact_phone": "+1234567890",
"roles": ["ADMIN"]
}
}
]
}

View File

@@ -0,0 +1,25 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"segments": [
{
"sub_request_status": "SUCCESS",
"segment": {
"id": "test_segment_123",
"updated_at": "2024-01-15T10:00:00.000Z",
"created_at": "2023-01-01T00:00:00.000Z",
"name": "Test Segment",
"ad_account_id": "test_adaccount_456",
"description": "Test segment description",
"status": "ACTIVE",
"source_type": "FIRST_PARTY",
"retention_in_days": 180,
"approximate_number_users": 1000,
"upload_status": "COMPLETE",
"targetable_status": "READY",
"organization_id": "test_org_123",
"visible_to": ["ALL_ACCOUNTS"]
}
}
]
}

View File

@@ -0,0 +1,23 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"lifetime_stats": [
{
"sub_request_status": "SUCCESS",
"lifetime_stat": {
"id": "test_entity_id",
"type": "AD_ACCOUNT",
"granularity": "LIFETIME",
"stats": {
"impressions": 100000,
"swipes": 5000,
"spend": 500000000,
"video_views": 80000,
"android_installs": 1000,
"ios_installs": 1500,
"total_installs": 2500
}
}
}
]
}

View File

@@ -0,0 +1,31 @@
{
"request_status": "SUCCESS",
"request_id": "test_request_id",
"timeseries_stats": [
{
"sub_request_status": "SUCCESS",
"timeseries_stat": {
"id": "test_entity_id",
"type": "AD_ACCOUNT",
"granularity": "HOUR",
"start_time": "2024-01-15T00:00:00.000-0800",
"end_time": "2024-01-15T01:00:00.000-0800",
"timeseries": [
{
"start_time": "2024-01-15T00:00:00.000-0800",
"end_time": "2024-01-15T01:00:00.000-0800",
"stats": {
"impressions": 1000,
"swipes": 50,
"spend": 5000000,
"video_views": 800,
"android_installs": 10,
"ios_installs": 15,
"total_installs": 25
}
}
]
}
}
]
}