From 4666499cc72237ff2cd47b7c714076caf4a8710b Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 17 Dec 2025 14:22:06 -0800 Subject: [PATCH] test(source-instagram): Add comprehensive mock server tests (#70951) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: sophie.cui@airbyte.io Co-authored-by: Octavia Squidington III --- .../integration/test_story_insights.py | 158 ------- .../{integration => mock_server}/__init__.py | 0 .../{integration => mock_server}/config.py | 4 + .../pagination.py | 0 .../request_builder.py | 10 +- .../response_builder.py | 16 + .../{integration => mock_server}/test_api.py | 5 + .../test_media.py | 35 +- .../test_media_insights.py | 103 ++++- .../test_stories.py | 34 +- .../mock_server/test_story_insights.py | 284 +++++++++++++ .../mock_server/test_user_insights.py | 400 ++++++++++++++++++ .../test_user_lifetime_insights.py | 46 +- .../test_users.py | 30 +- .../{integration => mock_server}/utils.py | 2 +- 15 files changed, 961 insertions(+), 166 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_story_insights.py rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/__init__.py (100%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/config.py (82%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/pagination.py (100%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/request_builder.py (89%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/response_builder.py (57%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/test_api.py (93%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/test_media.py (77%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/test_media_insights.py (73%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/test_stories.py (66%) create mode 100644 airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_story_insights.py create mode 100644 airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_insights.py rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/test_user_lifetime_insights.py (51%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/test_users.py (61%) rename airbyte-integrations/connectors/source-instagram/unit_tests/{integration => mock_server}/utils.py (89%) diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_story_insights.py b/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_story_insights.py deleted file mode 100644 index 3b3dd3789af..00000000000 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_story_insights.py +++ /dev/null @@ -1,158 +0,0 @@ -# -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -# -import json -import unittest -from unittest import TestCase - -import pytest - -from airbyte_cdk.models import SyncMode -from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput -from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse -from airbyte_cdk.test.mock_http.response_builder import ( - FieldPath, - HttpResponseBuilder, - RecordBuilder, - create_record_builder, - create_response_builder, - find_template, -) - -from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder -from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy -from .request_builder import RequestBuilder, get_account_request -from .response_builder import get_account_response -from .utils import config, read_output - - -PARENT_FIELDS = [ - "caption", - "id", - "ig_id", - "like_count", - "media_type", - "media_product_type", - "media_url", - "owner", - "permalink", - "shortcode", - "thumbnail_url", - "timestamp", - "username", -] -_PARENT_STREAM_NAME = "stories" -_STREAM_NAME = "story_insights" - -STORIES_ID = "3874523487643" -STORIES_ID_ERROR_CODE_10 = "3874523487644" - -HAPPY_PATH = "story_insights_happy_path" -ERROR_10 = "story_insights_error_code_10" - -_METRICS = ["reach", "replies", "follows", "profile_visits", "shares", "total_interactions"] - - -def _get_parent_request() -> RequestBuilder: - return RequestBuilder.get_stories_endpoint(item_id=BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(PARENT_FIELDS) - - -def _get_child_request(media_id, metric) -> RequestBuilder: - return RequestBuilder.get_media_insights_endpoint(item_id=media_id).with_custom_param("metric", metric, with_format=True) - - -def _get_response(stream_name: str, test: str = None, with_pagination_strategy: bool = True) -> HttpResponseBuilder: - scenario = "" - if test: - scenario = f"_for_{test}" - kwargs = { - "response_template": find_template(f"{stream_name}{scenario}", __file__), - "records_path": FieldPath("data"), - "pagination_strategy": InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN), - } - if with_pagination_strategy: - kwargs["pagination_strategy"] = InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN) - - return create_response_builder(**kwargs) - - -def _record(stream_name: str, test: str = None) -> RecordBuilder: - scenario = "" - if test: - scenario = f"_for_{test}" - return create_record_builder( - response_template=find_template(f"{stream_name}{scenario}", __file__), - records_path=FieldPath("data"), - record_id_path=FieldPath("id"), - ) - - -class TestFullRefresh(TestCase): - @staticmethod - def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: - return read_output( - config_builder=config_, - stream_name=_STREAM_NAME, - sync_mode=SyncMode.full_refresh, - expecting_exception=expecting_exception, - ) - - @HttpMocker() - def test_instagram_story_insights(self, http_mocker: HttpMocker) -> None: - test = HAPPY_PATH - # Mocking API stream - http_mocker.get( - get_account_request().build(), - get_account_response(), - ) - # Mocking parent stream - http_mocker.get( - _get_parent_request().build(), - _get_response(stream_name=_PARENT_STREAM_NAME, test=test) - .with_record(_record(stream_name=_PARENT_STREAM_NAME, test=test)) - .build(), - ) - - http_mocker.get( - _get_child_request(media_id=STORIES_ID, metric=_METRICS).build(), - HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 200), - ) - - output = self._read(config_=config()) - assert len(output.records) == 1 - assert output.records[0].record.data["page_id"] - assert output.records[0].record.data["business_account_id"] - assert output.records[0].record.data["id"] - for metric in _METRICS: - assert metric in output.records[0].record.data - - @HttpMocker() - def test_instagram_story_insights_for_error_code_30(self, http_mocker: HttpMocker) -> None: - test = ERROR_10 - http_mocker.get( - get_account_request().build(), - get_account_response(), - ) - # Mocking parent stream - http_mocker.get( - _get_parent_request().build(), HttpResponse(json.dumps(find_template(f"{_PARENT_STREAM_NAME}_for_{test}", __file__)), 200) - ) - # Good response - http_mocker.get( - _get_child_request(media_id=STORIES_ID, metric=_METRICS).build(), - HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{HAPPY_PATH}", __file__)), 200), - ) - # error 10 - http_mocker.get( - _get_child_request(media_id=STORIES_ID_ERROR_CODE_10, metric=_METRICS).build(), - HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 400), - ) - - output = self._read(config_=config()) - # error was ignored and correct record was processed - assert len(output.records) == 1 - assert output.records[0].record.data["page_id"] - assert output.records[0].record.data["business_account_id"] - assert output.records[0].record.data["id"] - for metric in _METRICS: - assert metric in output.records[0].record.data diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/__init__.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/__init__.py similarity index 100% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/__init__.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/__init__.py diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/config.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/config.py similarity index 82% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/config.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/config.py index ef3ea86c51d..30a7f6d78c4 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/config.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/config.py @@ -24,5 +24,9 @@ class ConfigBuilder: "start_date": START_DATE, } + def with_start_date(self, start_date: str) -> "ConfigBuilder": + self._config["start_date"] = start_date + return self + def build(self) -> MutableMapping[str, Any]: return self._config diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/pagination.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/pagination.py similarity index 100% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/pagination.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/pagination.py diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/request_builder.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/request_builder.py similarity index 89% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/request_builder.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/request_builder.py index b3e27e10014..e8a9c14d6d4 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/request_builder.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/request_builder.py @@ -6,7 +6,7 @@ from __future__ import annotations from typing import List, Optional, Union from airbyte_cdk.connector_builder.connector_builder_handler import resolve_manifest -from airbyte_cdk.test.mock_http.request import HttpRequest +from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS, HttpRequest from ..conftest import get_source from .config import ACCOUNTS_FIELDS @@ -81,6 +81,14 @@ class RequestBuilder: self._query_params[param] = value return self + def with_any_query_params(self) -> RequestBuilder: + """Set query params to ANY_QUERY_PARAMS to match any query parameters. + + This is useful for streams with dynamic query parameters like datetime cursors. + """ + self._query_params = ANY_QUERY_PARAMS + return self + @staticmethod def _get_formatted_fields(fields: List[str]) -> str: return ",".join(fields) diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/response_builder.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/response_builder.py similarity index 57% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/response_builder.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/response_builder.py index c1da1fc6454..58c85bb2f10 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/response_builder.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/response_builder.py @@ -27,3 +27,19 @@ def get_account_response() -> HttpResponse: "paging": {"cursors": {"before": "before_token"}}, } return build_response(body=response, status_code=HTTPStatus.OK) + + +SECOND_PAGE_ID = "333333333333333" +SECOND_BUSINESS_ACCOUNT_ID = "444444444444444" + + +def get_multiple_accounts_response() -> HttpResponse: + """Return a response with 2 accounts for testing substreams with multiple parent records.""" + response = { + "data": [ + {"id": PAGE_ID, "name": "AccountName", "instagram_business_account": {"id": BUSINESS_ACCOUNT_ID}}, + {"id": SECOND_PAGE_ID, "name": "SecondAccount", "instagram_business_account": {"id": SECOND_BUSINESS_ACCOUNT_ID}}, + ], + "paging": {"cursors": {"before": "before_token"}}, + } + return build_response(body=response, status_code=HTTPStatus.OK) diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_api.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_api.py similarity index 93% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_api.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_api.py index 7e6f1f8461c..1105b823af2 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_api.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_api.py @@ -67,6 +67,11 @@ class TestFullRefresh(TestCase): output = self._read(config_=config()) assert len(output.records) == 1 + # Verify transformations are applied (page_id, business_account_id in account field) + record = output.records[0].record.data + assert "account" in record + assert "page_id" in record["account"] + assert "business_account_id" in record["account"] @HttpMocker() def test_accounts_with_no_instagram_business_account_field(self, http_mocker: HttpMocker) -> None: diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_media.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_media.py similarity index 77% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_media.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_media.py index 1f922158c27..41cccfd8156 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_media.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_media.py @@ -20,7 +20,7 @@ from airbyte_cdk.test.mock_http.response_builder import ( from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy from .request_builder import RequestBuilder, get_account_request -from .response_builder import get_account_response +from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response from .utils import config, read_output @@ -96,6 +96,13 @@ class TestFullRefresh(TestCase): output = self._read(config_=config()) assert len(output.records) == 1 + # Verify transformations are applied + record = output.records[0].record.data + assert "page_id" in record + assert "business_account_id" in record + assert "media_insights_info" in record + assert record["page_id"] is not None + assert record["business_account_id"] is not None @HttpMocker() def test_given_multiple_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: @@ -158,3 +165,29 @@ class TestFullRefresh(TestCase): assert "ig_id" in child assert "media_type" in child assert "owner" in child + + @HttpMocker() + def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None: + """Test media stream against 2+ parent accounts per playbook requirements.""" + http_mocker.get( + get_account_request().build(), + get_multiple_accounts_response(), + ) + # Mock media requests for both accounts + http_mocker.get( + _get_request().build(), + _get_response().with_record(_record()).build(), + ) + http_mocker.get( + RequestBuilder.get_media_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(_FIELDS).build(), + _get_response().with_record(_record()).build(), + ) + + output = self._read(config_=config()) + # Verify we get records from both accounts + assert len(output.records) == 2 + # Verify transformations on all records + for record in output.records: + assert "page_id" in record.record.data + assert "business_account_id" in record.record.data + assert "media_insights_info" in record.record.data diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_media_insights.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_media_insights.py similarity index 73% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_media_insights.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_media_insights.py index 2426387e137..fdb78d8613a 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_media_insights.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_media_insights.py @@ -267,9 +267,87 @@ class TestFullRefresh(TestCase): assert output.records[0].record.data["id"] for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]: assert metric in output.records[0].record.data + # For IGNORE handlers, verify no ERROR logs are produced + assert not any(log.log.level == "ERROR" for log in output.logs) + + @HttpMocker() + def test_substream_with_multiple_parent_records(self, http_mocker: HttpMocker) -> None: + """Test media_insights substream against 2+ parent records per playbook requirements.""" + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + # Mock parent stream returning 2 media records (reels and general_media) + parent_response = { + "data": [ + { + "caption": "a caption", + "comments_count": 2, + "id": MEDIA_ID_REELS, + "ig_id": "3123724930722523505", + "is_comment_enabled": True, + "like_count": 12, + "media_type": "VIDEO", + "media_product_type": "REELS", + "media_url": "https://fakecontent.com/path/to/content", + "owner": {"id": "41408147298757123"}, + "permalink": "https://instagram.com/permalink/123", + "shortcode": "HGagdsy38", + "thumbnail_url": "https://fakecontent.cdninstagram.com/v/somepath/", + "timestamp": "2023-06-12T19:20:02+0000", + "username": "username", + }, + { + "caption": "another caption", + "comments_count": 0, + "id": MEDIA_ID_GENERAL_MEDIA, + "ig_id": "2034885879374760912", + "is_comment_enabled": True, + "like_count": 52, + "media_type": "IMAGE", + "media_product_type": "FEED", + "media_url": "https://fakecontent.com/path/to/content2", + "owner": {"id": "41408147298757123"}, + "permalink": "https://instagram.com/permalink/456", + "shortcode": "ABC123", + "timestamp": "2019-05-02T11:42:01+0000", + "username": "username", + }, + ], + "paging": {"cursors": {"before": "cursor123"}}, + } + http_mocker.get( + _get_parent_request().build(), + HttpResponse(json.dumps(parent_response), 200), + ) + + # Mock child requests for both parent records + http_mocker.get( + _get_child_request(media_id=MEDIA_ID_REELS, metric=_METRICS[MEDIA_ID_REELS]).build(), + HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{REELS}", __file__)), 200), + ) + http_mocker.get( + _get_child_request(media_id=MEDIA_ID_GENERAL_MEDIA, metric=_METRICS[MEDIA_ID_GENERAL_MEDIA]).build(), + HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{GENERAL_MEDIA}", __file__)), 200), + ) + + output = self._read(config_=config()) + # Verify we get records from both parent records + assert len(output.records) == 2 + record_ids = {r.record.data["id"] for r in output.records} + assert MEDIA_ID_REELS in record_ids + assert MEDIA_ID_GENERAL_MEDIA in record_ids + # Verify transformations on all records + for record in output.records: + assert record.record.data["page_id"] + assert record.record.data["business_account_id"] @HttpMocker() def test_instagram_insights_error_posted_before_business(self, http_mocker: HttpMocker) -> None: + """Test that error_subcode 2108006 (posted before business conversion) is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ test = ERROR_POSTED_BEFORE_BUSINESS http_mocker.get( get_account_request().build(), @@ -298,9 +376,18 @@ class TestFullRefresh(TestCase): assert output.records[0].record.data["id"] for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]: assert metric in output.records[0].record.data + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any( + "Insights error for business_account_id" in msg for msg in log_messages + ), f"Expected 'Insights error for business_account_id' in logs but got: {log_messages}" @HttpMocker() def test_instagram_insights_error_with_wrong_permissions(self, http_mocker: HttpMocker) -> None: + """Test that error code 100 with subcode 33 (wrong permissions) is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ test = ERROR_WITH_WRONG_PERMISSIONS http_mocker.get( get_account_request().build(), @@ -323,16 +410,24 @@ class TestFullRefresh(TestCase): ) output = self._read(config_=config()) - # error was ignored and correct record was processed assert len(output.records) == 1 assert output.records[0].record.data["page_id"] assert output.records[0].record.data["business_account_id"] assert output.records[0].record.data["id"] for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]: assert metric in output.records[0].record.data + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any( + "Check provided permissions for" in msg for msg in log_messages + ), f"Expected 'Check provided permissions for' in logs but got: {log_messages}" @HttpMocker() def test_instagram_insights_error_with_wrong_permissions_code_10(self, http_mocker: HttpMocker) -> None: + """Test that error code 10 with permission denied message is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ test = ERROR_WITH_WRONG_PERMISSIONS_CODE_10 http_mocker.get( get_account_request().build(), @@ -355,10 +450,14 @@ class TestFullRefresh(TestCase): ) output = self._read(config_=config()) - # error was ignored and correct record was processed assert len(output.records) == 1 assert output.records[0].record.data["page_id"] assert output.records[0].record.data["business_account_id"] assert output.records[0].record.data["id"] for metric in _METRICS[MEDIA_ID_GENERAL_MEDIA]: assert metric in output.records[0].record.data + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any( + "Check provided permissions for" in msg for msg in log_messages + ), f"Expected 'Check provided permissions for' in logs but got: {log_messages}" diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_stories.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_stories.py similarity index 66% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_stories.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_stories.py index 1236e736c7b..54f0fa37fab 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_stories.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_stories.py @@ -19,7 +19,7 @@ from airbyte_cdk.test.mock_http.response_builder import ( from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy from .request_builder import RequestBuilder, get_account_request -from .response_builder import get_account_response +from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response from .utils import config, read_output @@ -85,6 +85,12 @@ class TestFullRefresh(TestCase): output = self._read(config_=config()) assert len(output.records) == 1 + # Verify transformations are applied (page_id, business_account_id, story_insights_info, timestamp) + record = output.records[0].record.data + assert "page_id" in record + assert "business_account_id" in record + assert "story_insights_info" in record + assert "timestamp" in record @HttpMocker() def test_given_multiple_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: @@ -104,3 +110,29 @@ class TestFullRefresh(TestCase): output = self._read(config_=config()) assert len(output.records) == 3 + + @HttpMocker() + def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None: + """Test stories stream against 2+ parent accounts per playbook requirements.""" + http_mocker.get( + get_account_request().build(), + get_multiple_accounts_response(), + ) + # Mock stories requests for both accounts + http_mocker.get( + _get_request().build(), + _get_response().with_record(_record()).build(), + ) + http_mocker.get( + RequestBuilder.get_stories_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(FIELDS).build(), + _get_response().with_record(_record()).build(), + ) + + output = self._read(config_=config()) + # Verify we get records from both accounts + assert len(output.records) == 2 + # Verify transformations on all records + for record in output.records: + assert "page_id" in record.record.data + assert "business_account_id" in record.record.data + assert "story_insights_info" in record.record.data diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_story_insights.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_story_insights.py new file mode 100644 index 00000000000..55687db4fda --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_story_insights.py @@ -0,0 +1,284 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import json +from unittest import TestCase + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput +from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) + +from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder +from .pagination import NEXT_PAGE_TOKEN, InstagramPaginationStrategy +from .request_builder import RequestBuilder, get_account_request +from .response_builder import get_account_response +from .utils import config, read_output + + +PARENT_FIELDS = [ + "caption", + "id", + "ig_id", + "like_count", + "media_type", + "media_product_type", + "media_url", + "owner", + "permalink", + "shortcode", + "thumbnail_url", + "timestamp", + "username", +] +_PARENT_STREAM_NAME = "stories" +_STREAM_NAME = "story_insights" + +STORIES_ID = "3874523487643" +STORIES_ID_ERROR_CODE_10 = "3874523487644" + +HAPPY_PATH = "story_insights_happy_path" +ERROR_10 = "story_insights_error_code_10" + +_METRICS = ["reach", "replies", "follows", "profile_visits", "shares", "total_interactions"] + + +def _get_parent_request() -> RequestBuilder: + return RequestBuilder.get_stories_endpoint(item_id=BUSINESS_ACCOUNT_ID).with_limit(100).with_fields(PARENT_FIELDS) + + +def _get_child_request(media_id, metric) -> RequestBuilder: + return RequestBuilder.get_media_insights_endpoint(item_id=media_id).with_custom_param("metric", metric, with_format=True) + + +def _get_response(stream_name: str, test: str = None, with_pagination_strategy: bool = True) -> HttpResponseBuilder: + scenario = "" + if test: + scenario = f"_for_{test}" + kwargs = { + "response_template": find_template(f"{stream_name}{scenario}", __file__), + "records_path": FieldPath("data"), + "pagination_strategy": InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN), + } + if with_pagination_strategy: + kwargs["pagination_strategy"] = InstagramPaginationStrategy(request=_get_parent_request().build(), next_page_token=NEXT_PAGE_TOKEN) + + return create_response_builder(**kwargs) + + +def _record(stream_name: str, test: str = None) -> RecordBuilder: + scenario = "" + if test: + scenario = f"_for_{test}" + return create_record_builder( + response_template=find_template(f"{stream_name}{scenario}", __file__), + records_path=FieldPath("data"), + record_id_path=FieldPath("id"), + ) + + +class TestFullRefresh(TestCase): + @staticmethod + def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return read_output( + config_builder=config_, + stream_name=_STREAM_NAME, + sync_mode=SyncMode.full_refresh, + expecting_exception=expecting_exception, + ) + + @HttpMocker() + def test_instagram_story_insights(self, http_mocker: HttpMocker) -> None: + test = HAPPY_PATH + # Mocking API stream + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + # Mocking parent stream + http_mocker.get( + _get_parent_request().build(), + _get_response(stream_name=_PARENT_STREAM_NAME, test=test) + .with_record(_record(stream_name=_PARENT_STREAM_NAME, test=test)) + .build(), + ) + + http_mocker.get( + _get_child_request(media_id=STORIES_ID, metric=_METRICS).build(), + HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 200), + ) + + output = self._read(config_=config()) + assert len(output.records) == 1 + assert output.records[0].record.data["page_id"] + assert output.records[0].record.data["business_account_id"] + assert output.records[0].record.data["id"] + for metric in _METRICS: + assert metric in output.records[0].record.data + + @HttpMocker() + def test_instagram_story_insights_for_error_code_30(self, http_mocker: HttpMocker) -> None: + """Test that error code 10 is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ + test = ERROR_10 + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + # Mocking parent stream + http_mocker.get( + _get_parent_request().build(), HttpResponse(json.dumps(find_template(f"{_PARENT_STREAM_NAME}_for_{test}", __file__)), 200) + ) + # Good response + http_mocker.get( + _get_child_request(media_id=STORIES_ID, metric=_METRICS).build(), + HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{HAPPY_PATH}", __file__)), 200), + ) + # error 10 + http_mocker.get( + _get_child_request(media_id=STORIES_ID_ERROR_CODE_10, metric=_METRICS).build(), + HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{test}", __file__)), 400), + ) + + output = self._read(config_=config()) + assert len(output.records) == 1 + assert output.records[0].record.data["page_id"] + assert output.records[0].record.data["business_account_id"] + assert output.records[0].record.data["id"] + for metric in _METRICS: + assert metric in output.records[0].record.data + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any("Insights error" in msg for msg in log_messages), f"Expected 'Insights error' in logs but got: {log_messages}" + + @HttpMocker() + def test_substream_with_multiple_parent_records(self, http_mocker: HttpMocker) -> None: + """Test story_insights substream against 2+ parent records per playbook requirements.""" + STORIES_ID_2 = "3874523487645" + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + # Mock parent stream returning 2 story records + parent_response = { + "data": [ + { + "id": STORIES_ID, + "ig_id": "ig_id_1", + "like_count": 0, + "media_type": "VIDEO", + "media_product_type": "STORY", + "media_url": "https://fakecontent.cdninstagram.com/path1/path2/some_value", + "owner": {"id": "owner_id"}, + "permalink": "https://placeholder.com/stories/username/some_id_value", + "shortcode": "ERUY34867_3", + "thumbnail_url": "https://content.cdnfaker.com/path1/path2/some_value", + "timestamp": "2024-06-17T19:39:18+0000", + "username": "username", + }, + { + "id": STORIES_ID_2, + "ig_id": "ig_id_2", + "like_count": 5, + "media_type": "IMAGE", + "media_product_type": "STORY", + "media_url": "https://fakecontent.cdninstagram.com/path1/path2/another_value", + "owner": {"id": "owner_id"}, + "permalink": "https://placeholder.com/stories/username/another_id_value", + "shortcode": "XYZ98765_4", + "thumbnail_url": "https://content.cdnfaker.com/path1/path2/another_value", + "timestamp": "2024-06-18T10:15:30+0000", + "username": "username", + }, + ], + "paging": {"cursors": {"before": "cursor123"}}, + } + http_mocker.get( + _get_parent_request().build(), + HttpResponse(json.dumps(parent_response), 200), + ) + + # Mock child requests for both parent records + http_mocker.get( + _get_child_request(media_id=STORIES_ID, metric=_METRICS).build(), + HttpResponse(json.dumps(find_template(f"{_STREAM_NAME}_for_{HAPPY_PATH}", __file__)), 200), + ) + # Build response for second story with different ID + story_insights_response_2 = { + "data": [ + { + "name": "reach", + "period": "lifetime", + "values": [{"value": 150}], + "title": "Reach", + "description": "desc", + "id": f"{STORIES_ID_2}/insights/reach/lifetime", + }, + { + "name": "replies", + "period": "lifetime", + "values": [{"value": 3}], + "title": "Replies", + "description": "desc", + "id": f"{STORIES_ID_2}/insights/replies/lifetime", + }, + { + "name": "follows", + "period": "lifetime", + "values": [{"value": 2}], + "title": "Follows", + "description": "desc", + "id": f"{STORIES_ID_2}/insights/follows/lifetime", + }, + { + "name": "profile_visits", + "period": "lifetime", + "values": [{"value": 10}], + "title": "Profile Visits", + "description": "desc", + "id": f"{STORIES_ID_2}/insights/profile_visits/lifetime", + }, + { + "name": "shares", + "period": "lifetime", + "values": [{"value": 1}], + "title": "Shares", + "description": "desc", + "id": f"{STORIES_ID_2}/insights/shares/lifetime", + }, + { + "name": "total_interactions", + "period": "lifetime", + "values": [{"value": 16}], + "title": "Total Interactions", + "description": "desc", + "id": f"{STORIES_ID_2}/insights/total_interactions/lifetime", + }, + ] + } + http_mocker.get( + _get_child_request(media_id=STORIES_ID_2, metric=_METRICS).build(), + HttpResponse(json.dumps(story_insights_response_2), 200), + ) + + output = self._read(config_=config()) + # Verify we get records from both parent records + assert len(output.records) == 2 + record_ids = {r.record.data["id"] for r in output.records} + assert STORIES_ID in record_ids + assert STORIES_ID_2 in record_ids + # Verify transformations on all records + for record in output.records: + assert record.record.data["page_id"] + assert record.record.data["business_account_id"] + for metric in _METRICS: + assert metric in record.record.data diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_insights.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_insights.py new file mode 100644 index 00000000000..45ffac01b1c --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_insights.py @@ -0,0 +1,400 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +from unittest import TestCase + +import freezegun + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput +from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse +from airbyte_cdk.test.state_builder import StateBuilder + +from .config import BUSINESS_ACCOUNT_ID, PAGE_ID, ConfigBuilder +from .request_builder import RequestBuilder, get_account_request +from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, SECOND_PAGE_ID, get_account_response, get_multiple_accounts_response +from .utils import read_output + + +_STREAM_NAME = "user_insights" + +_FROZEN_TIME = "2024-01-15T12:00:00Z" + + +def _get_user_insights_request_any_params(business_account_id: str) -> RequestBuilder: + """Create a request builder for user_insights with any query params. + + The user_insights stream uses DatetimeBasedCursor with step P1D and QueryProperties + with 4 chunks (day/follower_count,reach; week/reach; days_28/reach; lifetime/online_followers). + This creates multiple time slices and query property combinations. + Using with_any_query_params() allows matching all these requests when the exact + parameters are not predictable or when testing behavior that doesn't depend on + specific request parameters. + """ + return RequestBuilder.get_user_lifetime_insights_endpoint(item_id=business_account_id).with_any_query_params() + + +def _get_user_insights_request_with_params(business_account_id: str, since: str, until: str, period: str, metric: str) -> RequestBuilder: + """Create a request builder for user_insights with specific query params.""" + return ( + RequestBuilder.get_user_lifetime_insights_endpoint(item_id=business_account_id) + .with_custom_param("since", since) + .with_custom_param("until", until) + .with_custom_param("period", period) + .with_custom_param("metric", metric) + ) + + +def _build_user_insights_response() -> HttpResponse: + """Build a successful user_insights response inline.""" + body = { + "data": [ + { + "name": "follower_count", + "period": "day", + "values": [{"value": 1000, "end_time": "2024-01-15T07:00:00+0000"}], + "title": "Follower Count", + "description": "Total number of followers", + "id": f"{BUSINESS_ACCOUNT_ID}/insights/follower_count/day", + }, + { + "name": "reach", + "period": "day", + "values": [{"value": 500, "end_time": "2024-01-15T07:00:00+0000"}], + "title": "Reach", + "description": "Total reach", + "id": f"{BUSINESS_ACCOUNT_ID}/insights/reach/day", + }, + ] + } + return HttpResponse(json.dumps(body), 200) + + +def _build_error_response(code: int, message: str, error_subcode: int = None) -> HttpResponse: + """Build an error response inline. + + Args: + code: The error code (e.g., 100, 10) + message: The error message + error_subcode: Optional error subcode (e.g., 2108006, 33) + """ + error = { + "message": message, + "type": "OAuthException", + "code": code, + "fbtrace_id": "ABC123", + } + if error_subcode is not None: + error["error_subcode"] = error_subcode + return HttpResponse(json.dumps({"error": error}), 400) + + +class TestFullRefresh(TestCase): + @staticmethod + def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return read_output( + config_builder=config_, + stream_name=_STREAM_NAME, + sync_mode=SyncMode.full_refresh, + expecting_exception=expecting_exception, + ) + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_read_records_full_refresh(self, http_mocker: HttpMocker) -> None: + """Test full refresh sync for user_insights stream. + + The user_insights stream uses DatetimeBasedCursor with step P1D and QueryProperties + with multiple chunks. We set start_date close to frozen time to minimize time slices. + Using with_any_query_params() because the stream makes multiple requests with different + period/metric combinations that are determined by the QueryProperties configuration. + """ + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + + http_mocker.get( + _get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(), + _build_user_insights_response(), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z") + output = self._read(config_=test_config) + assert len(output.records) == 1 + record = output.records[0].record.data + assert record.get("page_id") == PAGE_ID + assert record.get("business_account_id") == BUSINESS_ACCOUNT_ID + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None: + """Test user_insights stream against 2+ parent accounts per playbook requirements. + + This test verifies that the stream correctly processes data from multiple parent accounts + and applies transformations (page_id, business_account_id) to records from each account. + """ + http_mocker.get( + get_account_request().build(), + get_multiple_accounts_response(), + ) + + # Mock user_insights requests for both accounts + http_mocker.get( + _get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(), + _build_user_insights_response(), + ) + http_mocker.get( + _get_user_insights_request_any_params(SECOND_BUSINESS_ACCOUNT_ID).build(), + _build_user_insights_response(), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z") + output = self._read(config_=test_config) + + # Verify we get records from both accounts + assert len(output.records) == 2 + + # Verify transformations on all records + business_account_ids = {record.record.data.get("business_account_id") for record in output.records} + assert BUSINESS_ACCOUNT_ID in business_account_ids + assert SECOND_BUSINESS_ACCOUNT_ID in business_account_ids + + for record in output.records: + assert "page_id" in record.record.data + assert record.record.data["page_id"] is not None + assert "business_account_id" in record.record.data + assert record.record.data["business_account_id"] is not None + + +class TestIncremental(TestCase): + @staticmethod + def _read( + config_: ConfigBuilder, + state: list = None, + expecting_exception: bool = False, + ) -> EntrypointOutput: + return read_output( + config_builder=config_, + stream_name=_STREAM_NAME, + sync_mode=SyncMode.incremental, + state=state, + expecting_exception=expecting_exception, + ) + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_incremental_sync_first_sync_no_state(self, http_mocker: HttpMocker) -> None: + """Test incremental sync with no prior state (first sync). + + Using with_any_query_params() because without prior state, the stream starts from + start_date and creates multiple time slices with different period/metric combinations. + """ + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + + http_mocker.get( + _get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(), + _build_user_insights_response(), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z") + output = self._read(config_=test_config) + assert len(output.records) == 1 + assert len(output.state_messages) >= 1 + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_incremental_sync_with_prior_state(self, http_mocker: HttpMocker) -> None: + """Test incremental sync with prior state (subsequent sync). + + With prior state at 2024-01-15T00:00:00+00:00 and frozen time at 2024-01-15T12:00:00Z, + the stream should request data with since=2024-01-15T00:00:00Z. + We verify the outbound request includes the expected since parameter derived from state + by mocking specific query params for each QueryProperties chunk. + + The DatetimeBasedCursor uses the state value as the starting point, and the frozen time + determines the end datetime. With step P1D, there's only one time slice from state to now. + """ + prior_state_value = "2024-01-15T00:00:00+00:00" + # Expected since value derived from state - the API uses the state value format directly + expected_since = "2024-01-15T00:00:00+00:00" + # Expected until value is the frozen time (in the same format as the API expects) + expected_until = "2024-01-15T12:00:00+00:00" + + state = ( + StateBuilder() + .with_stream_state( + _STREAM_NAME, + { + "states": [ + { + "partition": {"business_account_id": BUSINESS_ACCOUNT_ID}, + "cursor": {"date": prior_state_value}, + } + ] + }, + ) + .build() + ) + + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + + # Mock each QueryProperties chunk with specific params to validate the since parameter + # Chunk 1: period=day, metric=follower_count,reach + http_mocker.get( + _get_user_insights_request_with_params( + BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="day", metric="follower_count,reach" + ).build(), + _build_user_insights_response(), + ) + # Chunk 2: period=week, metric=reach + http_mocker.get( + _get_user_insights_request_with_params( + BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="week", metric="reach" + ).build(), + _build_user_insights_response(), + ) + # Chunk 3: period=days_28, metric=reach + http_mocker.get( + _get_user_insights_request_with_params( + BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="days_28", metric="reach" + ).build(), + _build_user_insights_response(), + ) + # Chunk 4: period=lifetime, metric=online_followers + http_mocker.get( + _get_user_insights_request_with_params( + BUSINESS_ACCOUNT_ID, since=expected_since, until=expected_until, period="lifetime", metric="online_followers" + ).build(), + _build_user_insights_response(), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-14T00:00:00Z") + output = self._read(config_=test_config, state=state) + + # With specific mocks for each chunk, we can now assert exact record count + # The merge strategy groups by date, and all chunks return the same date (2024-01-15T07:00:00+0000) + # so records should be merged into 1 record + assert len(output.records) == 1 + assert len(output.state_messages) >= 1 + + # Verify the record has the expected business_account_id + record = output.records[0].record.data + assert record.get("business_account_id") == BUSINESS_ACCOUNT_ID + + # Verify the record date matches the expected date from our response + # Note: The date is normalized to RFC 3339 format (+00:00) by the schema normalization + assert record.get("date") == "2024-01-15T07:00:00+00:00" + + +class TestErrorHandling(TestCase): + """Test error handling for user_insights stream. + + The user_insights stream has IGNORE error handlers for: + - error_subcode 2108006: "Insights error for business_account_id: {message}" + - code 100 with error_subcode 33: "Check provided permissions for: {message}" + - code 10 with specific permission message: "Check provided permissions for: {message}" + + For IGNORE handlers, we verify: + 1. No ERROR logs are produced + 2. The configured error_message appears in logs (proving the handler was triggered) + 3. Zero records are returned (graceful handling) + """ + + @staticmethod + def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return read_output( + config_builder=config_, + stream_name=_STREAM_NAME, + sync_mode=SyncMode.full_refresh, + expecting_exception=expecting_exception, + ) + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_error_subcode_2108006_is_ignored(self, http_mocker: HttpMocker) -> None: + """Test that error_subcode 2108006 is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + + error_message = "Invalid parameter" + http_mocker.get( + _get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(), + _build_error_response(code=100, message=error_message, error_subcode=2108006), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z") + output = self._read(config_=test_config) + assert len(output.records) == 0 + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any( + "Insights error for business_account_id" in msg for msg in log_messages + ), f"Expected 'Insights error for business_account_id' in logs but got: {log_messages}" + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_error_code_100_subcode_33_is_ignored(self, http_mocker: HttpMocker) -> None: + """Test that error code 100 with subcode 33 is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + + error_message = "Unsupported get request" + http_mocker.get( + _get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(), + _build_error_response(code=100, message=error_message, error_subcode=33), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z") + output = self._read(config_=test_config) + assert len(output.records) == 0 + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any( + "Check provided permissions for" in msg for msg in log_messages + ), f"Expected 'Check provided permissions for' in logs but got: {log_messages}" + + @HttpMocker() + @freezegun.freeze_time(_FROZEN_TIME) + def test_error_code_10_permission_denied_is_ignored(self, http_mocker: HttpMocker) -> None: + """Test that error code 10 with permission denied message is gracefully ignored. + + Verifies both error code and error message assertion per playbook requirements. + """ + http_mocker.get( + get_account_request().build(), + get_account_response(), + ) + + error_message = "(#10) Application does not have permission for this action" + http_mocker.get( + _get_user_insights_request_any_params(BUSINESS_ACCOUNT_ID).build(), + _build_error_response(code=10, message=error_message), + ) + + test_config = ConfigBuilder().with_start_date("2024-01-15T00:00:00Z") + output = self._read(config_=test_config) + assert len(output.records) == 0 + assert not any(log.log.level == "ERROR" for log in output.logs) + log_messages = [log.log.message for log in output.logs] + assert any( + "Check provided permissions for" in msg for msg in log_messages + ), f"Expected 'Check provided permissions for' in logs but got: {log_messages}" diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_user_lifetime_insights.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_lifetime_insights.py similarity index 51% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_user_lifetime_insights.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_lifetime_insights.py index e89c14a03fe..99d4afd5a03 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_user_lifetime_insights.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_user_lifetime_insights.py @@ -18,7 +18,7 @@ from airbyte_cdk.test.mock_http.response_builder import ( from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder from .request_builder import RequestBuilder, get_account_request -from .response_builder import get_account_response +from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response from .utils import config, read_output @@ -79,3 +79,47 @@ class TestFullRefresh(TestCase): output = self._read(config_=config()) # each breakdown should produce a record assert len(output.records) == 3 + # Verify transformation: breakdown, page_id, business_account_id, and metric fields are added + for record in output.records: + assert "breakdown" in record.record.data + assert "page_id" in record.record.data + assert "business_account_id" in record.record.data + assert "metric" in record.record.data + assert record.record.data["page_id"] is not None + assert record.record.data["business_account_id"] is not None + + @HttpMocker() + def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None: + """Test user_lifetime_insights stream against 2+ parent accounts per playbook requirements.""" + http_mocker.get( + get_account_request().build(), + get_multiple_accounts_response(), + ) + # Mock requests for both accounts (each account has 3 breakdowns) + for breakdown in ["city", "country", "age,gender"]: + # First account + http_mocker.get( + _get_request().with_custom_param("breakdown", breakdown).build(), + _get_response().with_record(_record()).build(), + ) + # Second account + http_mocker.get( + RequestBuilder.get_user_lifetime_insights_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID) + .with_custom_param("metric", "follower_demographics") + .with_custom_param("period", "lifetime") + .with_custom_param("metric_type", "total_value") + .with_limit(100) + .with_custom_param("breakdown", breakdown) + .build(), + _get_response().with_record(_record()).build(), + ) + + output = self._read(config_=config()) + # 2 accounts × 3 breakdowns = 6 records + assert len(output.records) == 6 + # Verify transformations on all records + for record in output.records: + assert "breakdown" in record.record.data + assert "page_id" in record.record.data + assert "business_account_id" in record.record.data + assert "metric" in record.record.data diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_users.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_users.py similarity index 61% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_users.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_users.py index 6653ce392a9..f09b3a76824 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/test_users.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/test_users.py @@ -18,7 +18,7 @@ from airbyte_cdk.test.mock_http.response_builder import ( from .config import BUSINESS_ACCOUNT_ID, ConfigBuilder from .request_builder import RequestBuilder, get_account_request -from .response_builder import get_account_response +from .response_builder import SECOND_BUSINESS_ACCOUNT_ID, get_account_response, get_multiple_accounts_response from .utils import config, read_output @@ -80,3 +80,31 @@ class TestFullRefresh(TestCase): output = self._read(config_=config()) assert len(output.records) == 1 + # Verify transformation: page_id field is added from partition + assert "page_id" in output.records[0].record.data + assert output.records[0].record.data["page_id"] is not None + + @HttpMocker() + def test_substream_with_multiple_parent_accounts(self, http_mocker: HttpMocker) -> None: + """Test users stream against 2+ parent accounts per playbook requirements.""" + http_mocker.get( + get_account_request().build(), + get_multiple_accounts_response(), + ) + # Mock users requests for both accounts + http_mocker.get( + _get_request().build(), + _get_response().with_record(_record()).build(), + ) + http_mocker.get( + RequestBuilder.get_users_endpoint(item_id=SECOND_BUSINESS_ACCOUNT_ID).with_fields(_FIELDS).build(), + _get_response().with_record(_record()).build(), + ) + + output = self._read(config_=config()) + # Verify we get records from both accounts + assert len(output.records) == 2 + # Verify transformations on all records + for record in output.records: + assert "page_id" in record.record.data + assert record.record.data["page_id"] is not None diff --git a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/utils.py b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/utils.py similarity index 89% rename from airbyte-integrations/connectors/source-instagram/unit_tests/integration/utils.py rename to airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/utils.py index b0c70b2bd46..d6eb97d7ced 100644 --- a/airbyte-integrations/connectors/source-instagram/unit_tests/integration/utils.py +++ b/airbyte-integrations/connectors/source-instagram/unit_tests/mock_server/utils.py @@ -30,4 +30,4 @@ def read_output( ) -> EntrypointOutput: _catalog = catalog(stream_name, sync_mode) _config = config_builder.build() - return read(get_source(config=_config), _config, _catalog, state, expecting_exception) + return read(get_source(config=_config, state=state), _config, _catalog, state, expecting_exception)