1
0
mirror of synced 2025-12-19 10:00:34 -05:00

test(source-instagram): Add comprehensive mock server tests (#70951)

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
devin-ai-integration[bot]
2025-12-17 14:22:06 -08:00
committed by GitHub
parent 128e8f7b98
commit 4666499cc7
15 changed files with 961 additions and 166 deletions

View File

@@ -1,158 +0,0 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import json
import unittest
from unittest import TestCase
import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse
from airbyte_cdk.test.mock_http.response_builder import (
FieldPath,
HttpResponseBuilder,
RecordBuilder,
create_record_builder,
create_response_builder,
find_template,
)
from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy
from .request_builder import RequestBuilder, get_account_request
from .response_builder import get_account_response
from .utils import config, read_output
PARENT_FIELDS = [
"caption",
"id",
"ig_id",
"like_count",
"media_type",
"media_product_type",
"media_url",
"owner",
"permalink",
"shortcode",
"thumbnail_url",
"timestamp",
"username",
]
_PARENT_STREAM_NAME = "stories"
_STREAM_NAME = "story_insights"
STORIES_ID = "3874523487643"
STORIES_ID_ERROR_CODE_10 = "3874523487644"
HAPPY_PATH = "story_insights_happy_path"
ERROR_10 = "story_insights_error_code_10"
_METRICS = ["reach", "replies", "follows", "profile_visits", "shares", "total_interactions"]
def _get_parent_request() -> RequestBuilder:
return RequestBuilder.get_stories_endpoint(item_id=BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(PARENT_FIELDS)
def _get_child_request(media_id, metric) -> RequestBuilder:
return RequestBuilder.get_media_insights_endpoint(item_id=media_id).with_custom_param("metric", metric, with_format=True)
def _get_response(stream_name: str, test: str = None, with_pagination_strategy: bool = True) -> HttpResponseBuilder:
scenario = ""
if test:
scenario = f"_for_{test}"
kwargs = {
"response_template": find_template(f"{stream_name}{scenario}", __file__),
"records_path": FieldPath("data"),
"pagination_strategy": InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN),
}
if with_pagination_strategy:
kwargs["pagination_strategy"] = InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN)
return create_response_builder(**kwargs)
def _record(stream_name: str, test: str = None) -> RecordBuilder:
scenario = ""
if test:
scenario = f"_for_{test}"
return create_record_builder(
response_template=find_template(f"{stream_name}{scenario}", __file__),
records_path=FieldPath("data"),
record_id_path=FieldPath("id"),
)
class TestFullRefresh(TestCase):
@staticmethod
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return read_output(
config_builder=config_,
stream_name=_STREAM_NAME,
sync_mode=SyncMode.full_refresh,
expecting_exception=expecting_exception,
)
@HttpMocker()
def test_instagram_story_insights(self, http_mocker: HttpMocker) -> None:
test = HAPPY_PATH
# Mocking API stream
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mocking parent stream
http_mocker.get(
_get_parent_request().build(),
_get_response(stream_name=_PARENT_STREAM_NAME, test=test)
.with_record(_record(stream_name=_PARENT_STREAM_NAME, test=test))
.build(),
)
http_mocker.get(
_get_child_request(media_id=STORIES_ID, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 200),
)
output = self._read(config_=config())
assert len(output.records) == 1
assert output.records[0].record.data["page_id"]
assert output.records[0].record.data["business_account_id"]
assert output.records[0].record.data["id"]
for metric in _METRICS:
assert metric in output.records[0].record.data
@HttpMocker()
def test_instagram_story_insights_for_error_code_30(self, http_mocker: HttpMocker) -> None:
test = ERROR_10
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mocking parent stream
http_mocker.get(
_get_parent_request().build(), HttpResponse(json.dumps(find_template(f"{_PARENT_STREAM_NAME}_for_{test}", __file__)), 200)
)
# Good response
http_mocker.get(
_get_child_request(media_id=STORIES_ID, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{HAPPY_PATH}", __file__)), 200),
)
# error 10
http_mocker.get(
_get_child_request(media_id=STORIES_ID_ERROR_CODE_10, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 400),
)
output = self._read(config_=config())
# error was ignored and correct record was processed
assert len(output.records) == 1
assert output.records[0].record.data["page_id"]
assert output.records[0].record.data["business_account_id"]
assert output.records[0].record.data["id"]
for metric in _METRICS:
assert metric in output.records[0].record.data

View File

@@ -24,5 +24,9 @@ class ConfigBuilder:
"start_date": START_DATE,
}
def with_start_date(self, start_date: str) -> "ConfigBuilder":
self._config["start_date"] = start_date
return self
def build(self) -> MutableMapping[str, Any]:
return self._config

View File

@@ -6,7 +6,7 @@ from __future__ import annotations
from typing import List, Optional, Union
from airbyte_cdk.connector_builder.connector_builder_handler import resolve_manifest
from airbyte_cdk.test.mock_http.request import HttpRequest
from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS, HttpRequest
from ..conftest import get_source
from .config import ACCOUNTS_FIELDS
@@ -81,6 +81,14 @@ class RequestBuilder:
self._query_params[param] = value
return self
def with_any_query_params(self) -> RequestBuilder:
"""Set query params to ANY_QUERY_PARAMS to match any query parameters.
This is useful for streams with dynamic query parameters like datetime cursors.
"""
self._query_params = ANY_QUERY_PARAMS
return self
@staticmethod
def _get_formatted_fields(fields: List[str]) -> str:
return ",".join(fields)

View File

@@ -27,3 +27,19 @@ def get_account_response() -> HttpResponse:
"paging": {"cursors": {"before": "before_token"}},
}
return build_response(body=response, status_code=HTTPStatus.OK)
SECOND_PAGE_ID = "333333333333333"
SECOND_BUSINESS_ACCOUNT_ID = "444444444444444"
def get_multiple_accounts_response() -> HttpResponse:
"""Return a response with 2 accounts for testing substreams with multiple parent records."""
response = {
"data": [
{"id": PAGE_ID, "name": "AccountName", "instagram_business_account": {"id": BUSINESS_ACCOUNT_ID}},
{"id": SECOND_PAGE_ID, "name": "SecondAccount", "instagram_business_account": {"id": SECOND_BUSINESS_ACCOUNT_ID}},
],
"paging": {"cursors": {"before": "before_token"}},
}
return build_response(body=response, status_code=HTTPStatus.OK)

View File

@@ -67,6 +67,11 @@ class TestFullRefresh(TestCase):
output = self._read(config_=config())
assert len(output.records) == 1
# Verify transformations are applied (page_id, business_account_id in account field)
record = output.records[0].record.data
assert "account" in record
assert "page_id" in record["account"]
assert "business_account_id" in record["account"]
@HttpMocker()
def test_accounts_with_no_instagram_business_account_field(self, http_mocker: HttpMocker) -> None:

View File

@@ -20,7 +20,7 @@ from airbyte_cdk.test.mock_http.response_builder import (
from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy
from .request_builder import RequestBuilder, get_account_request
from .response_builder import get_account_response
from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response
from .utils import config, read_output
@@ -96,6 +96,13 @@ class TestFullRefresh(TestCase):
output = self._read(config_=config())
assert len(output.records) == 1
# Verify transformations are applied
record = output.records[0].record.data
assert "page_id" in record
assert "business_account_id" in record
assert "media_insights_info" in record
assert record["page_id"] is not None
assert record["business_account_id"] is not None
@HttpMocker()
def test_given_multiple_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None:
@@ -158,3 +165,29 @@ class TestFullRefresh(TestCase):
assert "ig_id" in child
assert "media_type" in child
assert "owner" in child
@HttpMocker()
def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None:
"""Test media stream against 2+ parent accounts per playbook requirements."""
http_mocker.get(
get_account_request().build(),
get_multiple_accounts_response(),
)
# Mock media requests for both accounts
http_mocker.get(
_get_request().build(),
_get_response().with_record(_record()).build(),
)
http_mocker.get(
RequestBuilder.get_media_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(_FIELDS).build(),
_get_response().with_record(_record()).build(),
)
output = self._read(config_=config())
# Verify we get records from both accounts
assert len(output.records) == 2
# Verify transformations on all records
for record in output.records:
assert "page_id" in record.record.data
assert "business_account_id" in record.record.data
assert "media_insights_info" in record.record.data

View File

@@ -267,9 +267,87 @@ class TestFullRefresh(TestCase):
assert output.records[0].record.data["id"]
for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]:
assert metric in output.records[0].record.data
# For IGNORE handlers, verify no ERROR logs are produced
assert not any(log.log.level == "ERROR" for log in output.logs)
@HttpMocker()
def test_substream_with_multiple_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test media_insights substream against 2+ parent records per playbook requirements."""
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mock parent stream returning 2 media records (reels and general_media)
parent_response = {
"data": [
{
"caption": "a caption",
"comments_count": 2,
"id": MEDIA_ID_REELS,
"ig_id": "3123724930722523505",
"is_comment_enabled": True,
"like_count": 12,
"media_type": "VIDEO",
"media_product_type": "REELS",
"media_url": "https://fakecontent.com/path/to/content",
"owner": {"id": "41408147298757123"},
"permalink": "https://instagram.com/permalink/123",
"shortcode": "HGagdsy38",
"thumbnail_url": "https://fakecontent.cdninstagram.com/v/somepath/",
"timestamp": "2023-06-12T19:20:02+0000",
"username": "username",
},
{
"caption": "another caption",
"comments_count": 0,
"id": MEDIA_ID_GENERAL_MEDIA,
"ig_id": "2034885879374760912",
"is_comment_enabled": True,
"like_count": 52,
"media_type": "IMAGE",
"media_product_type": "FEED",
"media_url": "https://fakecontent.com/path/to/content2",
"owner": {"id": "41408147298757123"},
"permalink": "https://instagram.com/permalink/456",
"shortcode": "ABC123",
"timestamp": "2019-05-02T11:42:01+0000",
"username": "username",
},
],
"paging": {"cursors": {"before": "cursor123"}},
}
http_mocker.get(
_get_parent_request().build(),
HttpResponse(json.dumps(parent_response), 200),
)
# Mock child requests for both parent records
http_mocker.get(
_get_child_request(media_id=MEDIA_ID_REELS, metric=_METRICS[MEDIA_ID_REELS]).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{REELS}", __file__)), 200),
)
http_mocker.get(
_get_child_request(media_id=MEDIA_ID_GENERAL_MEDIA, metric=_METRICS[MEDIA_ID_GENERAL_MEDIA]).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{GENERAL_MEDIA}", __file__)), 200),
)
output = self._read(config_=config())
# Verify we get records from both parent records
assert len(output.records) == 2
record_ids = {r.record.data["id"] for r in output.records}
assert MEDIA_ID_REELS in record_ids
assert MEDIA_ID_GENERAL_MEDIA in record_ids
# Verify transformations on all records
for record in output.records:
assert record.record.data["page_id"]
assert record.record.data["business_account_id"]
@HttpMocker()
def test_instagram_insights_error_posted_before_business(self, http_mocker: HttpMocker) -> None:
"""Test that error_subcode 2108006 (posted before business conversion) is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
test = ERROR_POSTED_BEFORE_BUSINESS
http_mocker.get(
get_account_request().build(),
@@ -298,9 +376,18 @@ class TestFullRefresh(TestCase):
assert output.records[0].record.data["id"]
for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]:
assert metric in output.records[0].record.data
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any(
"Insights error for business_account_id" in msg for msg in log_messages
), f"Expected 'Insights error for business_account_id' in logs but got: {log_messages}"
@HttpMocker()
def test_instagram_insights_error_with_wrong_permissions(self, http_mocker: HttpMocker) -> None:
"""Test that error code 100 with subcode 33 (wrong permissions) is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
test = ERROR_WITH_WRONG_PERMISSIONS
http_mocker.get(
get_account_request().build(),
@@ -323,16 +410,24 @@ class TestFullRefresh(TestCase):
)
output = self._read(config_=config())
# error was ignored and correct record was processed
assert len(output.records) == 1
assert output.records[0].record.data["page_id"]
assert output.records[0].record.data["business_account_id"]
assert output.records[0].record.data["id"]
for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]:
assert metric in output.records[0].record.data
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any(
"Check provided permissions for" in msg for msg in log_messages
), f"Expected 'Check provided permissions for' in logs but got: {log_messages}"
@HttpMocker()
def test_instagram_insights_error_with_wrong_permissions_code_10(self, http_mocker: HttpMocker) -> None:
"""Test that error code 10 with permission denied message is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
test = ERROR_WITH_WRONG_PERMISSIONS_CODE_10
http_mocker.get(
get_account_request().build(),
@@ -355,10 +450,14 @@ class TestFullRefresh(TestCase):
)
output = self._read(config_=config())
# error was ignored and correct record was processed
assert len(output.records) == 1
assert output.records[0].record.data["page_id"]
assert output.records[0].record.data["business_account_id"]
assert output.records[0].record.data["id"]
for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]:
assert metric in output.records[0].record.data
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any(
"Check provided permissions for" in msg for msg in log_messages
), f"Expected 'Check provided permissions for' in logs but got: {log_messages}"

View File

@@ -19,7 +19,7 @@ from airbyte_cdk.test.mock_http.response_builder import (
from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy
from .request_builder import RequestBuilder, get_account_request
from .response_builder import get_account_response
from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response
from .utils import config, read_output
@@ -85,6 +85,12 @@ class TestFullRefresh(TestCase):
output = self._read(config_=config())
assert len(output.records) == 1
# Verify transformations are applied (page_id, business_account_id, story_insights_info, timestamp)
record = output.records[0].record.data
assert "page_id" in record
assert "business_account_id" in record
assert "story_insights_info" in record
assert "timestamp" in record
@HttpMocker()
def test_given_multiple_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None:
@@ -104,3 +110,29 @@ class TestFullRefresh(TestCase):
output = self._read(config_=config())
assert len(output.records) == 3
@HttpMocker()
def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None:
"""Test stories stream against 2+ parent accounts per playbook requirements."""
http_mocker.get(
get_account_request().build(),
get_multiple_accounts_response(),
)
# Mock stories requests for both accounts
http_mocker.get(
_get_request().build(),
_get_response().with_record(_record()).build(),
)
http_mocker.get(
RequestBuilder.get_stories_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(FIELDS).build(),
_get_response().with_record(_record()).build(),
)
output = self._read(config_=config())
# Verify we get records from both accounts
assert len(output.records) == 2
# Verify transformations on all records
for record in output.records:
assert "page_id" in record.record.data
assert "business_account_id" in record.record.data
assert "story_insights_info" in record.record.data

View File

@@ -0,0 +1,284 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import json
from unittest import TestCase
from airbyte_cdk.models import SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse
from airbyte_cdk.test.mock_http.response_builder import (
FieldPath,
HttpResponseBuilder,
RecordBuilder,
create_record_builder,
create_response_builder,
find_template,
)
from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy
from .request_builder import RequestBuilder, get_account_request
from .response_builder import get_account_response
from .utils import config, read_output
PARENT_FIELDS = [
"caption",
"id",
"ig_id",
"like_count",
"media_type",
"media_product_type",
"media_url",
"owner",
"permalink",
"shortcode",
"thumbnail_url",
"timestamp",
"username",
]
_PARENT_STREAM_NAME = "stories"
_STREAM_NAME = "story_insights"
STORIES_ID = "3874523487643"
STORIES_ID_ERROR_CODE_10 = "3874523487644"
HAPPY_PATH = "story_insights_happy_path"
ERROR_10 = "story_insights_error_code_10"
_METRICS = ["reach", "replies", "follows", "profile_visits", "shares", "total_interactions"]
def _get_parent_request() -> RequestBuilder:
return RequestBuilder.get_stories_endpoint(item_id=BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(PARENT_FIELDS)
def _get_child_request(media_id, metric) -> RequestBuilder:
return RequestBuilder.get_media_insights_endpoint(item_id=media_id).with_custom_param("metric", metric, with_format=True)
def _get_response(stream_name: str, test: str = None, with_pagination_strategy: bool = True) -> HttpResponseBuilder:
scenario = ""
if test:
scenario = f"_for_{test}"
kwargs = {
"response_template": find_template(f"{stream_name}{scenario}", __file__),
"records_path": FieldPath("data"),
"pagination_strategy": InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN),
}
if with_pagination_strategy:
kwargs["pagination_strategy"] = InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN)
return create_response_builder(**kwargs)
def _record(stream_name: str, test: str = None) -> RecordBuilder:
scenario = ""
if test:
scenario = f"_for_{test}"
return create_record_builder(
response_template=find_template(f"{stream_name}{scenario}", __file__),
records_path=FieldPath("data"),
record_id_path=FieldPath("id"),
)
class TestFullRefresh(TestCase):
@staticmethod
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return read_output(
config_builder=config_,
stream_name=_STREAM_NAME,
sync_mode=SyncMode.full_refresh,
expecting_exception=expecting_exception,
)
@HttpMocker()
def test_instagram_story_insights(self, http_mocker: HttpMocker) -> None:
test = HAPPY_PATH
# Mocking API stream
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mocking parent stream
http_mocker.get(
_get_parent_request().build(),
_get_response(stream_name=_PARENT_STREAM_NAME, test=test)
.with_record(_record(stream_name=_PARENT_STREAM_NAME, test=test))
.build(),
)
http_mocker.get(
_get_child_request(media_id=STORIES_ID, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 200),
)
output = self._read(config_=config())
assert len(output.records) == 1
assert output.records[0].record.data["page_id"]
assert output.records[0].record.data["business_account_id"]
assert output.records[0].record.data["id"]
for metric in _METRICS:
assert metric in output.records[0].record.data
@HttpMocker()
def test_instagram_story_insights_for_error_code_30(self, http_mocker: HttpMocker) -> None:
"""Test that error code 10 is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
test = ERROR_10
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mocking parent stream
http_mocker.get(
_get_parent_request().build(), HttpResponse(json.dumps(find_template(f"{_PARENT_STREAM_NAME}_for_{test}", __file__)), 200)
)
# Good response
http_mocker.get(
_get_child_request(media_id=STORIES_ID, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{HAPPY_PATH}", __file__)), 200),
)
# error 10
http_mocker.get(
_get_child_request(media_id=STORIES_ID_ERROR_CODE_10, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 400),
)
output = self._read(config_=config())
assert len(output.records) == 1
assert output.records[0].record.data["page_id"]
assert output.records[0].record.data["business_account_id"]
assert output.records[0].record.data["id"]
for metric in _METRICS:
assert metric in output.records[0].record.data
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any("Insights error" in msg for msg in log_messages), f"Expected 'Insights error' in logs but got: {log_messages}"
@HttpMocker()
def test_substream_with_multiple_parent_records(self, http_mocker: HttpMocker) -> None:
"""Test story_insights substream against 2+ parent records per playbook requirements."""
STORIES_ID_2 = "3874523487645"
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mock parent stream returning 2 story records
parent_response = {
"data": [
{
"id": STORIES_ID,
"ig_id": "ig_id_1",
"like_count": 0,
"media_type": "VIDEO",
"media_product_type": "STORY",
"media_url": "https://fakecontent.cdninstagram.com/path1/path2/some_value",
"owner": {"id": "owner_id"},
"permalink": "https://placeholder.com/stories/username/some_id_value",
"shortcode": "ERUY34867_3",
"thumbnail_url": "https://content.cdnfaker.com/path1/path2/some_value",
"timestamp": "2024-06-17T19:39:18+0000",
"username": "username",
},
{
"id": STORIES_ID_2,
"ig_id": "ig_id_2",
"like_count": 5,
"media_type": "IMAGE",
"media_product_type": "STORY",
"media_url": "https://fakecontent.cdninstagram.com/path1/path2/another_value",
"owner": {"id": "owner_id"},
"permalink": "https://placeholder.com/stories/username/another_id_value",
"shortcode": "XYZ98765_4",
"thumbnail_url": "https://content.cdnfaker.com/path1/path2/another_value",
"timestamp": "2024-06-18T10:15:30+0000",
"username": "username",
},
],
"paging": {"cursors": {"before": "cursor123"}},
}
http_mocker.get(
_get_parent_request().build(),
HttpResponse(json.dumps(parent_response), 200),
)
# Mock child requests for both parent records
http_mocker.get(
_get_child_request(media_id=STORIES_ID, metric=_METRICS).build(),
HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{HAPPY_PATH}", __file__)), 200),
)
# Build response for second story with different ID
story_insights_response_2 = {
"data": [
{
"name": "reach",
"period": "lifetime",
"values": [{"value": 150}],
"title": "Reach",
"description": "desc",
"id": f"{STORIES_ID_2}/insights/reach/lifetime",
},
{
"name": "replies",
"period": "lifetime",
"values": [{"value": 3}],
"title": "Replies",
"description": "desc",
"id": f"{STORIES_ID_2}/insights/replies/lifetime",
},
{
"name": "follows",
"period": "lifetime",
"values": [{"value": 2}],
"title": "Follows",
"description": "desc",
"id": f"{STORIES_ID_2}/insights/follows/lifetime",
},
{
"name": "profile_visits",
"period": "lifetime",
"values": [{"value": 10}],
"title": "Profile Visits",
"description": "desc",
"id": f"{STORIES_ID_2}/insights/profile_visits/lifetime",
},
{
"name": "shares",
"period": "lifetime",
"values": [{"value": 1}],
"title": "Shares",
"description": "desc",
"id": f"{STORIES_ID_2}/insights/shares/lifetime",
},
{
"name": "total_interactions",
"period": "lifetime",
"values": [{"value": 16}],
"title": "Total Interactions",
"description": "desc",
"id": f"{STORIES_ID_2}/insights/total_interactions/lifetime",
},
]
}
http_mocker.get(
_get_child_request(media_id=STORIES_ID_2, metric=_METRICS).build(),
HttpResponse(json.dumps(story_insights_response_2), 200),
)
output = self._read(config_=config())
# Verify we get records from both parent records
assert len(output.records) == 2
record_ids = {r.record.data["id"] for r in output.records}
assert STORIES_ID in record_ids
assert STORIES_ID_2 in record_ids
# Verify transformations on all records
for record in output.records:
assert record.record.data["page_id"]
assert record.record.data["business_account_id"]
for metric in _METRICS:
assert metric in record.record.data

View File

@@ -0,0 +1,400 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import json
from unittest import TestCase
import freezegun
from airbyte_cdk.models import SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse
from airbyte_cdk.test.state_builder import StateBuilder
from .config import BUSINESS_ACCOUNT_ID, PAGE_ID, ConfigBuilder
from .request_builder import RequestBuilder, get_account_request
from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, SECOND_PAGE_ID, get_account_response, get_multiple_accounts_response
from .utils import read_output
_STREAM_NAME = "user_insights"
_FROZEN_TIME = "2024-01-15T12:00:00Z"
def _get_user_insights_request_any_params(business_account_id: str) -> RequestBuilder:
"""Create a request builder for user_insights with any query params.
The user_insights stream uses DatetimeBasedCursor with step P1D and QueryProperties
with 4 chunks (day/follower_count,reach; week/reach; days_28/reach; lifetime/online_followers).
This creates multiple time slices and query property combinations.
Using with_any_query_params() allows matching all these requests when the exact
parameters are not predictable or when testing behavior that doesn't depend on
specific request parameters.
"""
return RequestBuilder.get_user_lifetime_insights_endpoint(item_id=business_account_id).with_any_query_params()
def _get_user_insights_request_with_params(business_account_id: str, since: str, until: str, period: str, metric: str) -> RequestBuilder:
"""Create a request builder for user_insights with specific query params."""
return (
RequestBuilder.get_user_lifetime_insights_endpoint(item_id=business_account_id)
.with_custom_param("since", since)
.with_custom_param("until", until)
.with_custom_param("period", period)
.with_custom_param("metric", metric)
)
def _build_user_insights_response() -> HttpResponse:
"""Build a successful user_insights response inline."""
body = {
"data": [
{
"name": "follower_count",
"period": "day",
"values": [{"value": 1000, "end_time": "2024-01-15T07:00:00+0000"}],
"title": "Follower Count",
"description": "Total number of followers",
"id": f"{BUSINESS_ACCOUNT_ID}/insights/follower_count/day",
},
{
"name": "reach",
"period": "day",
"values": [{"value": 500, "end_time": "2024-01-15T07:00:00+0000"}],
"title": "Reach",
"description": "Total reach",
"id": f"{BUSINESS_ACCOUNT_ID}/insights/reach/day",
},
]
}
return HttpResponse(json.dumps(body), 200)
def _build_error_response(code: int, message: str, error_subcode: int = None) -> HttpResponse:
"""Build an error response inline.
Args:
code: The error code (e.g., 100, 10)
message: The error message
error_subcode: Optional error subcode (e.g., 2108006, 33)
"""
error = {
"message": message,
"type": "OAuthException",
"code": code,
"fbtrace_id": "ABC123",
}
if error_subcode is not None:
error["error_subcode"] = error_subcode
return HttpResponse(json.dumps({"error": error}), 400)
class TestFullRefresh(TestCase):
@staticmethod
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return read_output(
config_builder=config_,
stream_name=_STREAM_NAME,
sync_mode=SyncMode.full_refresh,
expecting_exception=expecting_exception,
)
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_read_records_full_refresh(self, http_mocker: HttpMocker) -> None:
"""Test full refresh sync for user_insights stream.
The user_insights stream uses DatetimeBasedCursor with step P1D and QueryProperties
with multiple chunks. We set start_date close to frozen time to minimize time slices.
Using with_any_query_params() because the stream makes multiple requests with different
period/metric combinations that are determined by the QueryProperties configuration.
"""
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
http_mocker.get(
_get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(),
_build_user_insights_response(),
)
test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z")
output = self._read(config_=test_config)
assert len(output.records) == 1
record = output.records[0].record.data
assert record.get("page_id") == PAGE_ID
assert record.get("business_account_id") == BUSINESS_ACCOUNT_ID
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None:
"""Test user_insights stream against 2+ parent accounts per playbook requirements.
This test verifies that the stream correctly processes data from multiple parent accounts
and applies transformations (page_id, business_account_id) to records from each account.
"""
http_mocker.get(
get_account_request().build(),
get_multiple_accounts_response(),
)
# Mock user_insights requests for both accounts
http_mocker.get(
_get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(),
_build_user_insights_response(),
)
http_mocker.get(
_get_user_insights_request_any_params(SECOND_BUSINESS_ACCOUNT_ID).build(),
_build_user_insights_response(),
)
test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z")
output = self._read(config_=test_config)
# Verify we get records from both accounts
assert len(output.records) == 2
# Verify transformations on all records
business_account_ids = {record.record.data.get("business_account_id") for record in output.records}
assert BUSINESS_ACCOUNT_ID in business_account_ids
assert SECOND_BUSINESS_ACCOUNT_ID in business_account_ids
for record in output.records:
assert "page_id" in record.record.data
assert record.record.data["page_id"] is not None
assert "business_account_id" in record.record.data
assert record.record.data["business_account_id"] is not None
class TestIncremental(TestCase):
@staticmethod
def _read(
config_: ConfigBuilder,
state: list = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
return read_output(
config_builder=config_,
stream_name=_STREAM_NAME,
sync_mode=SyncMode.incremental,
state=state,
expecting_exception=expecting_exception,
)
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_incremental_sync_first_sync_no_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with no prior state (first sync).
Using with_any_query_params() because without prior state, the stream starts from
start_date and creates multiple time slices with different period/metric combinations.
"""
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
http_mocker.get(
_get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(),
_build_user_insights_response(),
)
test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z")
output = self._read(config_=test_config)
assert len(output.records) == 1
assert len(output.state_messages) >= 1
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_incremental_sync_with_prior_state(self, http_mocker: HttpMocker) -> None:
"""Test incremental sync with prior state (subsequent sync).
With prior state at 2024-01-15T00:00:00+00:00 and frozen time at 2024-01-15T12:00:00Z,
the stream should request data with since=2024-01-15T00:00:00Z.
We verify the outbound request includes the expected since parameter derived from state
by mocking specific query params for each QueryProperties chunk.
The DatetimeBasedCursor uses the state value as the starting point, and the frozen time
determines the end datetime. With step P1D, there's only one time slice from state to now.
"""
prior_state_value = "2024-01-15T00:00:00+00:00"
# Expected since value derived from state - the API uses the state value format directly
expected_since = "2024-01-15T00:00:00+00:00"
# Expected until value is the frozen time (in the same format as the API expects)
expected_until = "2024-01-15T12:00:00+00:00"
state = (
StateBuilder()
.with_stream_state(
_STREAM_NAME,
{
"states": [
{
"partition": {"business_account_id": BUSINESS_ACCOUNT_ID},
"cursor": {"date": prior_state_value},
}
]
},
)
.build()
)
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
# Mock each QueryProperties chunk with specific params to validate the since parameter
# Chunk 1: period=day, metric=follower_count,reach
http_mocker.get(
_get_user_insights_request_with_params(
BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="day", metric="follower_count,reach"
).build(),
_build_user_insights_response(),
)
# Chunk 2: period=week, metric=reach
http_mocker.get(
_get_user_insights_request_with_params(
BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="week", metric="reach"
).build(),
_build_user_insights_response(),
)
# Chunk 3: period=days_28, metric=reach
http_mocker.get(
_get_user_insights_request_with_params(
BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="days_28", metric="reach"
).build(),
_build_user_insights_response(),
)
# Chunk 4: period=lifetime, metric=online_followers
http_mocker.get(
_get_user_insights_request_with_params(
BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="lifetime", metric="online_followers"
).build(),
_build_user_insights_response(),
)
test_config = ConfigBuilder().with_start_date("2024-01-14T00:00:00Z")
output = self._read(config_=test_config, state=state)
# With specific mocks for each chunk, we can now assert exact record count
# The merge strategy groups by date, and all chunks return the same date (2024-01-15T07:00:00+0000)
# so records should be merged into 1 record
assert len(output.records) == 1
assert len(output.state_messages) >= 1
# Verify the record has the expected business_account_id
record = output.records[0].record.data
assert record.get("business_account_id") == BUSINESS_ACCOUNT_ID
# Verify the record date matches the expected date from our response
# Note: The date is normalized to RFC 3339 format (+00:00) by the schema normalization
assert record.get("date") == "2024-01-15T07:00:00+00:00"
class TestErrorHandling(TestCase):
"""Test error handling for user_insights stream.
The user_insights stream has IGNORE error handlers for:
- error_subcode 2108006: "Insights error for business_account_id: {message}"
- code 100 with error_subcode 33: "Check provided permissions for: {message}"
- code 10 with specific permission message: "Check provided permissions for: {message}"
For IGNORE handlers, we verify:
1. No ERROR logs are produced
2. The configured error_message appears in logs (proving the handler was triggered)
3. Zero records are returned (graceful handling)
"""
@staticmethod
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return read_output(
config_builder=config_,
stream_name=_STREAM_NAME,
sync_mode=SyncMode.full_refresh,
expecting_exception=expecting_exception,
)
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_error_subcode_2108006_is_ignored(self, http_mocker: HttpMocker) -> None:
"""Test that error_subcode 2108006 is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
error_message = "Invalid parameter"
http_mocker.get(
_get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(),
_build_error_response(code=100, message=error_message, error_subcode=2108006),
)
test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z")
output = self._read(config_=test_config)
assert len(output.records) == 0
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any(
"Insights error for business_account_id" in msg for msg in log_messages
), f"Expected 'Insights error for business_account_id' in logs but got: {log_messages}"
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_error_code_100_subcode_33_is_ignored(self, http_mocker: HttpMocker) -> None:
"""Test that error code 100 with subcode 33 is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
error_message = "Unsupported get request"
http_mocker.get(
_get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(),
_build_error_response(code=100, message=error_message, error_subcode=33),
)
test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z")
output = self._read(config_=test_config)
assert len(output.records) == 0
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any(
"Check provided permissions for" in msg for msg in log_messages
), f"Expected 'Check provided permissions for' in logs but got: {log_messages}"
@HttpMocker()
@freezegun.freeze_time(_FROZEN_TIME)
def test_error_code_10_permission_denied_is_ignored(self, http_mocker: HttpMocker) -> None:
"""Test that error code 10 with permission denied message is gracefully ignored.
Verifies both error code and error message assertion per playbook requirements.
"""
http_mocker.get(
get_account_request().build(),
get_account_response(),
)
error_message = "(#10) Application does not have permission for this action"
http_mocker.get(
_get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(),
_build_error_response(code=10, message=error_message),
)
test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z")
output = self._read(config_=test_config)
assert len(output.records) == 0
assert not any(log.log.level == "ERROR" for log in output.logs)
log_messages = [log.log.message for log in output.logs]
assert any(
"Check provided permissions for" in msg for msg in log_messages
), f"Expected 'Check provided permissions for' in logs but got: {log_messages}"

View File

@@ -18,7 +18,7 @@ from airbyte_cdk.test.mock_http.response_builder import (
from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder
from .request_builder import RequestBuilder, get_account_request
from .response_builder import get_account_response
from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response
from .utils import config, read_output
@@ -79,3 +79,47 @@ class TestFullRefresh(TestCase):
output = self._read(config_=config())
# each breakdown should produce a record
assert len(output.records) == 3
# Verify transformation: breakdown, page_id, business_account_id, and metric fields are added
for record in output.records:
assert "breakdown" in record.record.data
assert "page_id" in record.record.data
assert "business_account_id" in record.record.data
assert "metric" in record.record.data
assert record.record.data["page_id"] is not None
assert record.record.data["business_account_id"] is not None
@HttpMocker()
def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None:
"""Test user_lifetime_insights stream against 2+ parent accounts per playbook requirements."""
http_mocker.get(
get_account_request().build(),
get_multiple_accounts_response(),
)
# Mock requests for both accounts (each account has 3 breakdowns)
for breakdown in ["city", "country", "age,gender"]:
# First account
http_mocker.get(
_get_request().with_custom_param("breakdown", breakdown).build(),
_get_response().with_record(_record()).build(),
)
# Second account
http_mocker.get(
RequestBuilder.get_user_lifetime_insights_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID)
.with_custom_param("metric", "follower_demographics")
.with_custom_param("period", "lifetime")
.with_custom_param("metric_type", "total_value")
.with_limit(100)
.with_custom_param("breakdown", breakdown)
.build(),
_get_response().with_record(_record()).build(),
)
output = self._read(config_=config())
# 2 accounts × 3 breakdowns = 6 records
assert len(output.records) == 6
# Verify transformations on all records
for record in output.records:
assert "breakdown" in record.record.data
assert "page_id" in record.record.data
assert "business_account_id" in record.record.data
assert "metric" in record.record.data

View File

@@ -18,7 +18,7 @@ from airbyte_cdk.test.mock_http.response_builder import (
from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder
from .request_builder import RequestBuilder, get_account_request
from .response_builder import get_account_response
from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response
from .utils import config, read_output
@@ -80,3 +80,31 @@ class TestFullRefresh(TestCase):
output = self._read(config_=config())
assert len(output.records) == 1
# Verify transformation: page_id field is added from partition
assert "page_id" in output.records[0].record.data
assert output.records[0].record.data["page_id"] is not None
@HttpMocker()
def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None:
"""Test users stream against 2+ parent accounts per playbook requirements."""
http_mocker.get(
get_account_request().build(),
get_multiple_accounts_response(),
)
# Mock users requests for both accounts
http_mocker.get(
_get_request().build(),
_get_response().with_record(_record()).build(),
)
http_mocker.get(
RequestBuilder.get_users_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID).with_fields(_FIELDS).build(),
_get_response().with_record(_record()).build(),
)
output = self._read(config_=config())
# Verify we get records from both accounts
assert len(output.records) == 2
# Verify transformations on all records
for record in output.records:
assert "page_id" in record.record.data
assert record.record.data["page_id"] is not None

View File

@@ -30,4 +30,4 @@ def read_output(
) -> EntrypointOutput:
_catalog = catalog(stream_name, sync_mode)
_config = config_builder.build()
return read(get_source(config=_config), _config, _catalog, state, expecting_exception)
return read(get_source(config=_config, state=state), _config, _catalog, state, expecting_exception)