1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🚨🚨 Source Facebook Marketing: update API to v19.0 (#35746)

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
This commit is contained in:
Artem Inzhyyants
2024-03-06 10:44:12 +01:00
committed by GitHub
parent 1d9e5463c6
commit e10826ce83
24 changed files with 314 additions and 614 deletions

View File

@@ -37,7 +37,7 @@ acceptance_tests:
incremental:
tests:
- config_path: "secrets/config.json"
timeout_seconds: 4800
timeout_seconds: 6000
future_state:
future_state_path: "integration_tests/future_state.json"
full_refresh:

View File

@@ -11,11 +11,11 @@
"order": 0,
"pattern_descriptor": "The Ad Account ID must be a number.",
"examples": ["111111111111111"],
"type": "array",
"minItems": 1,
"type": "array",
"items": {
"pattern": "^[0-9]+$",
"type": "string"
"type": "string",
"pattern": "^[0-9]+$"
},
"uniqueItems": true
},
@@ -175,6 +175,7 @@
"catalog_segment_value_omni_purchase_roas",
"catalog_segment_value_website_purchase_roas",
"clicks",
"conversion_lead_rate",
"conversion_rate_ranking",
"conversion_values",
"conversions",
@@ -185,6 +186,7 @@
"cost_per_action_type",
"cost_per_ad_click",
"cost_per_conversion",
"cost_per_conversion_lead",
"cost_per_dda_countby_convs",
"cost_per_estimated_ad_recallers",
"cost_per_inline_link_click",
@@ -229,6 +231,9 @@
"interactive_component_tap",
"labels",
"location",
"marketing_messages_cost_per_delivered",
"marketing_messages_cost_per_link_btn_click",
"marketing_messages_spend",
"mobile_app_purchase_roas",
"objective",
"optimization_goal",
@@ -238,9 +243,6 @@
"purchase_roas",
"qualifying_question_qualify_answer_rate",
"quality_ranking",
"quality_score_ectr",
"quality_score_ecvr",
"quality_score_organic",
"reach",
"social_spend",
"spend",
@@ -309,7 +311,16 @@
"image_asset",
"impression_device",
"is_conversion_id_modeled",
"landing_destination",
"link_url_asset",
"marketing_messages_btn_name",
"mdsa_landing_destination",
"media_asset_url",
"media_creator",
"media_destination_url",
"media_format",
"media_origin_url",
"media_text_content",
"mmm",
"place_page_id",
"platform_position",
@@ -320,6 +331,8 @@
"region",
"skan_campaign_id",
"skan_conversion_id",
"skan_version",
"standard_event_content_type",
"title_asset",
"video_asset"
]
@@ -343,7 +356,8 @@
"action_target_id",
"action_type",
"action_video_sound",
"action_video_type"
"action_video_type",
"standard_event_content_type"
]
}
},

View File

@@ -31,11 +31,7 @@ def configured_catalog_fixture(config) -> ConfiguredAirbyteCatalog:
streams = []
# Prefer incremental if available
for stream in catalog.streams:
sync_mode = (
SyncMode.incremental
if SyncMode.incremental in stream.supported_sync_modes
else SyncMode.full_refresh
)
sync_mode = SyncMode.incremental if SyncMode.incremental in stream.supported_sync_modes else SyncMode.full_refresh
streams.append(
ConfiguredAirbyteStream(
stream=stream,
@@ -56,9 +52,7 @@ class TestFacebookMarketingSource:
("ad_sets", "23846541706990398"),
],
)
def test_streams_with_include_deleted(
self, stream_name, deleted_id, config_with_include_deleted, configured_catalog
):
def test_streams_with_include_deleted(self, stream_name, deleted_id, config_with_include_deleted, configured_catalog):
catalog = self._slice_catalog(configured_catalog, {stream_name})
records, states = self._read_records(config_with_include_deleted, catalog)
deleted_records = list(filter(self._deleted_record, records))
@@ -67,16 +61,10 @@ class TestFacebookMarketingSource:
assert states, "incremental read should produce states"
for name, state in states[-1].state.data.items():
assert (
"filter_statuses" in state[account_id]
), f"State for {name} should include `filter_statuses` flag"
assert "filter_statuses" in state[account_id], f"State for {name} should include `filter_statuses` flag"
assert (
deleted_records
), f"{stream_name} stream should have deleted records returned"
assert (
is_specific_deleted_pulled
), f"{stream_name} stream should have a deleted record with id={deleted_id}"
assert deleted_records, f"{stream_name} stream should have deleted records returned"
assert is_specific_deleted_pulled, f"{stream_name} stream should have a deleted record with id={deleted_id}"
@pytest.mark.parametrize(
"stream_name, deleted_num, filter_statuses",
@@ -146,14 +134,10 @@ class TestFacebookMarketingSource:
value["filter_statuses"] = filter_statuses
catalog = self._slice_catalog(configured_catalog, {stream_name})
records, states = self._read_records(
config_with_include_deleted, catalog, state=state
)
records, states = self._read_records(config_with_include_deleted, catalog, state=state)
deleted_records = list(filter(self._deleted_record, records))
assert (
len(deleted_records) == deleted_num
), f"{stream_name} should have {deleted_num} deleted records returned"
assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned"
@staticmethod
def _deleted_record(record: AirbyteMessage) -> bool:
@@ -164,9 +148,7 @@ class TestFacebookMarketingSource:
return str(record.record.data["id"])
@staticmethod
def _slice_catalog(
catalog: ConfiguredAirbyteCatalog, streams: Set[str]
) -> ConfiguredAirbyteCatalog:
def _slice_catalog(catalog: ConfiguredAirbyteCatalog, streams: Set[str]) -> ConfiguredAirbyteCatalog:
sliced_catalog = ConfiguredAirbyteCatalog(streams=[])
for stream in catalog.streams:
if stream.stream.name in streams:
@@ -174,14 +156,10 @@ class TestFacebookMarketingSource:
return sliced_catalog
@staticmethod
def _read_records(
conf, catalog, state=None
) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]:
def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]:
records = []
states = []
for message in SourceFacebookMarketing().read(
logging.getLogger("airbyte"), conf, catalog, state=state
):
for message in SourceFacebookMarketing().read(logging.getLogger("airbyte"), conf, catalog, state=state):
if message.type == Type.RECORD:
records.append(message)
elif message.type == Type.STATE:

View File

@@ -6,11 +6,11 @@ data:
hosts:
- graph.facebook.com
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.4.2
dockerImageTag: 2.0.0
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
@@ -27,6 +27,22 @@ data:
oss:
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
2.0.0:
message: "All Ads-Insights-* streams now have updated schemas. Users will need to retest source confguration, refresh the source schema and reset affected streams after upgrading. For more information [visit](https://docs.airbyte.com/integrations/sources/facebook-marketing-migrations)"
upgradeDeadline: "2024-03-17"
scopedImpact:
- scopeType: stream
impactedScopes:
- "ads_insights"
- "ads_insights_age_and_gender"
- "ads_insights_action_type"
- "ads_insights_country"
- "ads_insights_platform_and_device"
- "ads_insights_region"
- "ads_insights_dma"
- "ads_insights_action_product_id"
suggestedStreams:
streams:
- ads_insights

View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -432,6 +432,18 @@ files = [
{file = "dpath-2.0.8.tar.gz", hash = "sha256:a3440157ebe80d0a3ad794f1b61c571bef125214800ffdb9afc9424e8250fe9b"},
]
[[package]]
name = "enum34"
version = "1.1.10"
description = "Python 3.4 Enum backported to 3.3, 3.2, 3.1, 2.7, 2.6, 2.5, and 2.4"
optional = false
python-versions = "*"
files = [
{file = "enum34-1.1.10-py2-none-any.whl", hash = "sha256:a98a201d6de3f2ab3db284e70a33b0f896fbf35f8086594e8c9e74b909058d53"},
{file = "enum34-1.1.10-py3-none-any.whl", hash = "sha256:c3858660960c984d6ab0ebad691265180da2b43f07e061c0f8dca9ef3cffd328"},
{file = "enum34-1.1.10.tar.gz", hash = "sha256:cce6a7477ed816bd2542d03d53db9f0db935dd013b70f336a95c73979289f248"},
]
[[package]]
name = "exceptiongroup"
version = "1.2.0"
@@ -448,18 +460,19 @@ test = ["pytest (>=6)"]
[[package]]
name = "facebook-business"
version = "17.0.0"
version = "19.0.0"
description = "Facebook Business SDK"
optional = false
python-versions = "*"
files = [
{file = "facebook_business-17.0.0-py3-none-any.whl", hash = "sha256:f4b87a940a068d94ace6dc2dde7e0d43602264da18375ebfb0a8059a48a47012"},
{file = "facebook_business-17.0.0.tar.gz", hash = "sha256:6a1c11185384325b49d640a7abb60e610b8f8561a8add1206d8e7e5f24626cf2"},
{file = "facebook_business-19.0.0-py3-none-any.whl", hash = "sha256:591deedc010cefeb49151bbfadf72659cf262056072b437ca3dbf0ba37b3fa43"},
{file = "facebook_business-19.0.0.tar.gz", hash = "sha256:e12ea2a13d1703922d1b5d3921bc67bd10176596770ce154f287019738775800"},
]
[package.dependencies]
aiohttp = {version = "*", markers = "python_version >= \"3.5.3\""}
curlify = ">=2.1.0"
enum34 = {version = "*", markers = "python_version >= \"3\""}
pycountry = ">=19.8.18"
requests = ">=2.3.0"
six = ">=1.7.3"
@@ -1507,4 +1520,4 @@ multidict = ">=4.0"
[metadata]
lock-version = "2.0"
python-versions = "^3.9,<3.12"
content-hash = "5753d144dc008fabd12b18d9e28d148ee96976d7b83cdcf0a82b3ea22f8f315f"
content-hash = "cac4564b0e204ad1f4b5d0d3abce8cb436e80193351a8253cf3c27b677ee908e"

View File

@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "1.4.2"
version = "2.0.0"
name = "source-facebook-marketing"
description = "Source implementation for Facebook Marketing."
authors = [ "Airbyte <contact@airbyte.io>",]
@@ -18,9 +18,8 @@ include = "source_facebook_marketing"
[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "==0.62.1"
facebook-business = "==17.0.0"
facebook-business = "19.0.0"
cached-property = "==1.5.2"
pendulum = "==2.1.2"
[tool.poetry.scripts]
source-facebook-marketing = "source_facebook_marketing.run:run"

View File

@@ -33,9 +33,6 @@
"adset_name": {
"type": ["null", "string"]
},
"age_targeting": {
"type": ["null", "string"]
},
"attribution_setting": {
"type": ["null", "string"]
},
@@ -81,6 +78,9 @@
"clicks": {
"type": ["null", "integer"]
},
"conversion_lead_rate": {
"type": ["null", "number"]
},
"conversion_rate_ranking": {
"type": ["null", "string"]
},
@@ -111,6 +111,9 @@
"cost_per_conversion": {
"$ref": "ads_action_stats.json"
},
"cost_per_conversion_lead": {
"type": ["null", "number"]
},
"cost_per_estimated_ad_recallers": {
"type": ["null", "number"]
},
@@ -165,24 +168,9 @@
"engagement_rate_ranking": {
"type": ["null", "string"]
},
"estimated_ad_recall_rate": {
"type": ["null", "number"]
},
"estimated_ad_recall_rate_lower_bound": {
"type": ["null", "number"]
},
"estimated_ad_recall_rate_upper_bound": {
"type": ["null", "number"]
},
"estimated_ad_recallers": {
"type": ["null", "number"]
},
"estimated_ad_recallers_lower_bound": {
"type": ["null", "number"]
},
"estimated_ad_recallers_upper_bound": {
"type": ["null", "number"]
},
"frequency": {
"type": ["null", "number"]
},
@@ -192,9 +180,6 @@
"full_view_reach": {
"type": ["null", "number"]
},
"gender_targeting": {
"type": ["null", "string"]
},
"impressions": {
"type": ["null", "integer"]
},
@@ -216,12 +201,6 @@
"instant_experience_outbound_clicks": {
"$ref": "ads_action_stats.json"
},
"labels": {
"type": ["null", "string"]
},
"location": {
"type": ["null", "string"]
},
"mobile_app_purchase_roas": {
"$ref": "ads_action_stats.json"
},

View File

@@ -70,8 +70,7 @@ def api_fixture(some_config, requests_mock, fb_account_response):
)
requests_mock.register_uri(
"GET",
FacebookSession.GRAPH
+ f"/{FB_API_VERSION}/act_{some_config['account_ids'][0]}/",
FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_ids'][0]}/",
[fb_account_response],
)
return api

View File

@@ -70,7 +70,7 @@ class RequestBuilder:
def build(self) -> HttpRequest:
return HttpRequest(
url=f"https://graph.facebook.com/v17.0/{self._account_sub_path()}{self._resource}",
url=f"https://graph.facebook.com/v19.0/{self._account_sub_path()}{self._resource}",
query_params=self._query_params,
body=self._body,
)

View File

@@ -63,7 +63,6 @@ def _job_start_request(
"ad_name",
"adset_id",
"adset_name",
"age_targeting",
"attribution_setting",
"auction_bid",
"auction_competitiveness",
@@ -79,6 +78,7 @@ def _job_start_request(
"catalog_segment_value_omni_purchase_roas",
"catalog_segment_value_website_purchase_roas",
"clicks",
"conversion_lead_rate",
"conversion_rate_ranking",
"conversion_values",
"conversions",
@@ -89,6 +89,7 @@ def _job_start_request(
"cost_per_action_type",
"cost_per_ad_click",
"cost_per_conversion",
"cost_per_conversion_lead",
"cost_per_estimated_ad_recallers",
"cost_per_inline_link_click",
"cost_per_inline_post_engagement",
@@ -106,16 +107,10 @@ def _job_start_request(
"date_start",
"date_stop",
"engagement_rate_ranking",
"estimated_ad_recall_rate",
"estimated_ad_recall_rate_lower_bound",
"estimated_ad_recall_rate_upper_bound",
"estimated_ad_recallers",
"estimated_ad_recallers_lower_bound",
"estimated_ad_recallers_upper_bound",
"frequency",
"full_view_impressions",
"full_view_reach",
"gender_targeting",
"impressions",
"inline_link_click_ctr",
"inline_link_clicks",
@@ -123,8 +118,6 @@ def _job_start_request(
"instant_experience_clicks_to_open",
"instant_experience_clicks_to_start",
"instant_experience_outbound_clicks",
"labels",
"location",
"mobile_app_purchase_roas",
"objective",
"optimization_goal",
@@ -168,9 +161,7 @@ def _job_start_request(
"action_attribution_windows": ["1d_click", "7d_click", "28d_click", "1d_view", "7d_view", "28d_view"],
"time_range": {"since": since, "until": until},
}
return RequestBuilder.get_insights_endpoint(access_token=ACCESS_TOKEN, account_id=account_id).with_body(
encode_request_body(body)
)
return RequestBuilder.get_insights_endpoint(access_token=ACCESS_TOKEN, account_id=account_id).with_body(encode_request_body(body))
def _job_status_request(report_run_ids: Union[str, List[str]]) -> RequestBuilder:
@@ -206,12 +197,9 @@ def _job_status_response(
job_ids = [job_ids]
body = [
{
"body": json.dumps(
{
"id": job_id, "account_id": account_id, "async_status": status, "async_percent_completion": 100
}
),
} for job_id in job_ids
"body": json.dumps({"id": job_id, "account_id": account_id, "async_status": status, "async_percent_completion": 100}),
}
for job_id in job_ids
]
return build_response(body=body, status_code=HTTPStatus.OK)
@@ -236,7 +224,6 @@ def _ads_insights_action_product_id_record() -> RecordBuilder:
@freezegun.freeze_time(NOW.isoformat())
class TestFullRefresh(TestCase):
@staticmethod
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return read_output(
@@ -272,9 +259,7 @@ class TestFullRefresh(TestCase):
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
)
output = self._read(
config().with_account_ids([client_side_account_id]).with_start_date(start_date).with_end_date(end_date)
)
output = self._read(config().with_account_ids([client_side_account_id]).with_start_date(start_date).with_end_date(end_date))
assert len(output.records) == 1
@HttpMocker()
@@ -289,9 +274,10 @@ class TestFullRefresh(TestCase):
)
http_mocker.get(
_get_insights_request(_JOB_ID).with_next_page_token(NEXT_PAGE_TOKEN).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).with_record(
_ads_insights_action_product_id_record()
).build(),
_insights_response()
.with_record(_ads_insights_action_product_id_record())
.with_record(_ads_insights_action_product_id_record())
.build(),
)
output = self._read(config())
@@ -330,15 +316,9 @@ class TestFullRefresh(TestCase):
http_mocker.get(get_account_request().build(), get_account_response())
http_mocker.get(_update_api_throttle_limit_request().build(), _update_api_throttle_limit_response())
http_mocker.post(
_job_start_request(since=start_date, until=start_date).build(), _job_start_response(report_run_id_1)
)
http_mocker.post(
_job_start_request(since=end_date, until=end_date).build(), _job_start_response(report_run_id_2)
)
http_mocker.post(
_job_status_request([report_run_id_1, report_run_id_2]).build(), _job_status_response([job_id_1, job_id_2])
)
http_mocker.post(_job_start_request(since=start_date, until=start_date).build(), _job_start_response(report_run_id_1))
http_mocker.post(_job_start_request(since=end_date, until=end_date).build(), _job_start_response(report_run_id_2))
http_mocker.post(_job_status_request([report_run_id_1, report_run_id_2]).build(), _job_status_response([job_id_1, job_id_2]))
http_mocker.get(
_get_insights_request(job_id_1).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
@@ -352,9 +332,7 @@ class TestFullRefresh(TestCase):
assert len(output.records) == 2
@HttpMocker()
def test_given_multiple_account_ids_when_read_then_return_records_from_all_accounts(
self, http_mocker: HttpMocker
) -> None:
def test_given_multiple_account_ids_when_read_then_return_records_from_all_accounts(self, http_mocker: HttpMocker) -> None:
account_id_1 = "123123123"
account_id_2 = "321321321"
report_run_id_1 = "1571860060019500"
@@ -364,35 +342,19 @@ class TestFullRefresh(TestCase):
api_throttle_limit_response = _update_api_throttle_limit_response()
http_mocker.get(
get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1)
)
http_mocker.get(
_update_api_throttle_limit_request().with_account_id(account_id_1).build(), api_throttle_limit_response
)
http_mocker.post(
_job_start_request().with_account_id(account_id_1).build(), _job_start_response(report_run_id_1)
)
http_mocker.post(
_job_status_request(report_run_id_1).build(), _job_status_response(job_id_1, account_id=account_id_1)
)
http_mocker.get(get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1))
http_mocker.get(_update_api_throttle_limit_request().with_account_id(account_id_1).build(), api_throttle_limit_response)
http_mocker.post(_job_start_request().with_account_id(account_id_1).build(), _job_start_response(report_run_id_1))
http_mocker.post(_job_status_request(report_run_id_1).build(), _job_status_response(job_id_1, account_id=account_id_1))
http_mocker.get(
_get_insights_request(job_id_1).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
)
http_mocker.get(
get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2)
)
http_mocker.get(
_update_api_throttle_limit_request().with_account_id(account_id_2).build(), api_throttle_limit_response
)
http_mocker.post(
_job_start_request().with_account_id(account_id_2).build(), _job_start_response(report_run_id_2)
)
http_mocker.post(
_job_status_request(report_run_id_2).build(), _job_status_response(job_id_2, account_id=account_id_2)
)
http_mocker.get(get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2))
http_mocker.get(_update_api_throttle_limit_request().with_account_id(account_id_2).build(), api_throttle_limit_response)
http_mocker.post(_job_start_request().with_account_id(account_id_2).build(), _job_start_response(report_run_id_2))
http_mocker.post(_job_status_request(report_run_id_2).build(), _job_status_response(job_id_2, account_id=account_id_2))
http_mocker.get(
_get_insights_request(job_id_2).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
@@ -436,16 +398,12 @@ class TestIncremental(TestCase):
)
@HttpMocker()
def test_when_read_then_state_message_produced_and_state_match_start_interval(
self, http_mocker: HttpMocker
) -> None:
def test_when_read_then_state_message_produced_and_state_match_start_interval(self, http_mocker: HttpMocker) -> None:
account_id = "123123123"
start_date = NOW.set(hour=0, minute=0, second=0)
end_date = NOW.set(hour=23, minute=59, second=59)
http_mocker.get(
get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id)
)
http_mocker.get(get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id))
http_mocker.get(
_update_api_throttle_limit_request().with_account_id(account_id).build(),
_update_api_throttle_limit_response(),
@@ -454,18 +412,14 @@ class TestIncremental(TestCase):
_job_start_request(since=start_date, until=end_date).with_account_id(account_id).build(),
_job_start_response(_REPORT_RUN_ID),
)
http_mocker.post(
_job_status_request(_REPORT_RUN_ID).build(), _job_status_response(_JOB_ID, account_id=account_id)
)
http_mocker.post(_job_status_request(_REPORT_RUN_ID).build(), _job_status_response(_JOB_ID, account_id=account_id))
http_mocker.get(
_get_insights_request(_JOB_ID).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
)
output = self._read(config().with_account_ids([account_id]).with_start_date(start_date).with_end_date(end_date))
cursor_value_from_state_message = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id, {}).get(
_CURSOR_FIELD
)
cursor_value_from_state_message = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id, {}).get(_CURSOR_FIELD)
assert cursor_value_from_state_message == start_date.strftime(DATE_FORMAT)
@HttpMocker()
@@ -483,51 +437,33 @@ class TestIncremental(TestCase):
api_throttle_limit_response = _update_api_throttle_limit_response()
http_mocker.get(
get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1)
)
http_mocker.get(
_update_api_throttle_limit_request().with_account_id(account_id_1).build(), api_throttle_limit_response
)
http_mocker.get(get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1))
http_mocker.get(_update_api_throttle_limit_request().with_account_id(account_id_1).build(), api_throttle_limit_response)
http_mocker.post(
_job_start_request(since=start_date, until=end_date).with_account_id(account_id_1).build(),
_job_start_response(report_run_id_1),
)
http_mocker.post(
_job_status_request(report_run_id_1).build(), _job_status_response(job_id_1, account_id=account_id_1)
)
http_mocker.post(_job_status_request(report_run_id_1).build(), _job_status_response(job_id_1, account_id=account_id_1))
http_mocker.get(
_get_insights_request(job_id_1).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
)
http_mocker.get(
get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2)
)
http_mocker.get(
_update_api_throttle_limit_request().with_account_id(account_id_2).build(), api_throttle_limit_response
)
http_mocker.get(get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2))
http_mocker.get(_update_api_throttle_limit_request().with_account_id(account_id_2).build(), api_throttle_limit_response)
http_mocker.post(
_job_start_request(since=start_date, until=end_date).with_account_id(account_id_2).build(),
_job_start_response(report_run_id_2),
)
http_mocker.post(
_job_status_request(report_run_id_2).build(), _job_status_response(job_id_2, account_id=account_id_2)
)
http_mocker.post(_job_status_request(report_run_id_2).build(), _job_status_response(job_id_2, account_id=account_id_2))
http_mocker.get(
_get_insights_request(job_id_2).build(),
_insights_response().with_record(_ads_insights_action_product_id_record()).build(),
)
output = self._read(
config().with_account_ids([account_id_1, account_id_2]).with_start_date(start_date).with_end_date(end_date)
)
cursor_value_from_state_account_1 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_1, {}).get(
_CURSOR_FIELD
)
cursor_value_from_state_account_2 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_2, {}).get(
_CURSOR_FIELD
)
output = self._read(config().with_account_ids([account_id_1, account_id_2]).with_start_date(start_date).with_end_date(end_date))
cursor_value_from_state_account_1 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_1, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_2 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_2, {}).get(_CURSOR_FIELD)
expected_cursor_value = start_date.strftime(DATE_FORMAT)
assert cursor_value_from_state_account_1 == expected_cursor_value
assert cursor_value_from_state_account_2 == expected_cursor_value

View File

@@ -62,18 +62,19 @@ _FIELDS = [
def _get_videos_request(account_id: Optional[str] = ACCOUNT_ID) -> RequestBuilder:
return RequestBuilder.get_videos_endpoint(
access_token=ACCESS_TOKEN, account_id=account_id
).with_limit(100).with_fields(_FIELDS).with_summary()
return (
RequestBuilder.get_videos_endpoint(access_token=ACCESS_TOKEN, account_id=account_id)
.with_limit(100)
.with_fields(_FIELDS)
.with_summary()
)
def _get_videos_response() -> HttpResponseBuilder:
return create_response_builder(
response_template=find_template(_STREAM_NAME, __file__),
records_path=FieldPath("data"),
pagination_strategy=FacebookMarketingPaginationStrategy(
request=_get_videos_request().build(), next_page_token=NEXT_PAGE_TOKEN
),
pagination_strategy=FacebookMarketingPaginationStrategy(request=_get_videos_request().build(), next_page_token=NEXT_PAGE_TOKEN),
)
@@ -88,7 +89,6 @@ def _video_record() -> RecordBuilder:
@freezegun.freeze_time(NOW.isoformat())
class TestFullRefresh(TestCase):
@staticmethod
def _read(config_: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return read_output(
@@ -131,22 +131,16 @@ class TestFullRefresh(TestCase):
assert len(output.records) == 3
@HttpMocker()
def test_given_multiple_account_ids_when_read_then_return_records_from_all_accounts(
self, http_mocker: HttpMocker
) -> None:
def test_given_multiple_account_ids_when_read_then_return_records_from_all_accounts(self, http_mocker: HttpMocker) -> None:
account_id_1 = "123123123"
account_id_2 = "321321321"
http_mocker.get(
get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1)
)
http_mocker.get(get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1))
http_mocker.get(
_get_videos_request().with_account_id(account_id_1).build(),
_get_videos_response().with_record(_video_record()).build(),
)
http_mocker.get(
get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2)
)
http_mocker.get(get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2))
http_mocker.get(
_get_videos_request().with_account_id(account_id_2).build(),
_get_videos_response().with_record(_video_record()).build(),
@@ -159,9 +153,7 @@ class TestFullRefresh(TestCase):
def test_when_read_then_add_account_id_field(self, http_mocker: HttpMocker) -> None:
account_id = "123123123"
http_mocker.get(
get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id)
)
http_mocker.get(get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id))
http_mocker.get(
_get_videos_request().with_account_id(account_id).build(),
_get_videos_response().with_record(_video_record()).build(),
@@ -179,9 +171,7 @@ class TestFullRefresh(TestCase):
http_mocker.get(get_account_request().build(), get_account_response())
http_mocker.get(
_get_videos_request().with_fields(_FIELDS).with_summary().build(),
_get_videos_response().with_record(
_video_record().with_field(FieldPath(created_time_field), input_datetime_value)
).build(),
_get_videos_response().with_record(_video_record().with_field(FieldPath(created_time_field), input_datetime_value)).build(),
)
output = self._read(config())
@@ -224,20 +214,17 @@ class TestIncremental(TestCase):
max_cursor_value = "2024-02-01T00:00:00+00:00"
account_id = "123123123"
http_mocker.get(
get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id)
)
http_mocker.get(get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id))
http_mocker.get(
_get_videos_request().with_account_id(account_id).build(),
_get_videos_response().with_record(_video_record().with_cursor(max_cursor_value)).with_record(
_video_record().with_cursor(min_cursor_value)
).build(),
_get_videos_response()
.with_record(_video_record().with_cursor(max_cursor_value))
.with_record(_video_record().with_cursor(min_cursor_value))
.build(),
)
output = self._read(config().with_account_ids([account_id]))
cursor_value_from_state_message = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id, {}).get(
_CURSOR_FIELD
)
cursor_value_from_state_message = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id, {}).get(_CURSOR_FIELD)
assert cursor_value_from_state_message == max_cursor_value
@HttpMocker()
@@ -251,54 +238,44 @@ class TestIncremental(TestCase):
min_cursor_value_account_id_2 = "2024-03-01T00:00:00+00:00"
max_cursor_value_account_id_2 = "2024-04-01T00:00:00+00:00"
http_mocker.get(
get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1)
)
http_mocker.get(get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1))
http_mocker.get(
_get_videos_request().with_account_id(account_id_1).build(),
_get_videos_response().with_record(_video_record().with_cursor(max_cursor_value_account_id_1)).with_record(
_video_record().with_cursor(min_cursor_value_account_id_1)
).build(),
)
http_mocker.get(
get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2)
_get_videos_response()
.with_record(_video_record().with_cursor(max_cursor_value_account_id_1))
.with_record(_video_record().with_cursor(min_cursor_value_account_id_1))
.build(),
)
http_mocker.get(get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2))
http_mocker.get(
_get_videos_request().with_account_id(account_id_2).build(),
_get_videos_response().with_record(_video_record().with_cursor(max_cursor_value_account_id_2)).with_record(
_video_record().with_cursor(min_cursor_value_account_id_2)
).build(),
_get_videos_response()
.with_record(_video_record().with_cursor(max_cursor_value_account_id_2))
.with_record(_video_record().with_cursor(min_cursor_value_account_id_2))
.build(),
)
output = self._read(config().with_account_ids([account_id_1, account_id_2]))
cursor_value_from_state_account_1 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_1, {}).get(
_CURSOR_FIELD
)
cursor_value_from_state_account_2 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_2, {}).get(
_CURSOR_FIELD
)
cursor_value_from_state_account_1 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_1, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_2 = output.most_recent_state.get(_STREAM_NAME, {}).get(account_id_2, {}).get(_CURSOR_FIELD)
assert cursor_value_from_state_account_1 == max_cursor_value_account_id_1
assert cursor_value_from_state_account_2 == max_cursor_value_account_id_2
@HttpMocker()
def test_given_state_when_read_then_records_with_cursor_value_less_than_state_filtered(
self, http_mocker: HttpMocker
) -> None:
def test_given_state_when_read_then_records_with_cursor_value_less_than_state_filtered(self, http_mocker: HttpMocker) -> None:
account_id = "123123123"
cursor_value_1 = "2024-01-01T00:00:00+00:00"
cursor_value_2 = "2024-01-02T00:00:00+00:00"
cursor_value_3 = "2024-01-03T00:00:00+00:00"
http_mocker.get(
get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id)
)
http_mocker.get(get_account_request().with_account_id(account_id).build(), get_account_response(account_id=account_id))
http_mocker.get(
_get_videos_request().with_account_id(account_id).build(),
_get_videos_response().with_record(_video_record().with_cursor(cursor_value_3)).with_record(
_video_record().with_cursor(cursor_value_2)
).with_record(
_video_record().with_cursor(cursor_value_1)
).build(),
_get_videos_response()
.with_record(_video_record().with_cursor(cursor_value_3))
.with_record(_video_record().with_cursor(cursor_value_2))
.with_record(_video_record().with_cursor(cursor_value_1))
.build(),
)
output = self._read(
@@ -317,27 +294,23 @@ class TestIncremental(TestCase):
cursor_value_2 = "2024-01-02T00:00:00+00:00"
cursor_value_3 = "2024-01-03T00:00:00+00:00"
http_mocker.get(
get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1)
)
http_mocker.get(get_account_request().with_account_id(account_id_1).build(), get_account_response(account_id=account_id_1))
http_mocker.get(
_get_videos_request().with_account_id(account_id_1).build(),
_get_videos_response().with_record(_video_record().with_cursor(cursor_value_3)).with_record(
_video_record().with_cursor(cursor_value_2)
).with_record(
_video_record().with_cursor(cursor_value_1)
).build(),
)
http_mocker.get(
get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2)
_get_videos_response()
.with_record(_video_record().with_cursor(cursor_value_3))
.with_record(_video_record().with_cursor(cursor_value_2))
.with_record(_video_record().with_cursor(cursor_value_1))
.build(),
)
http_mocker.get(get_account_request().with_account_id(account_id_2).build(), get_account_response(account_id=account_id_2))
http_mocker.get(
_get_videos_request().with_account_id(account_id_2).build(),
_get_videos_response().with_record(_video_record().with_cursor(cursor_value_3)).with_record(
_video_record().with_cursor(cursor_value_2)
).with_record(
_video_record().with_cursor(cursor_value_1)
).build(),
_get_videos_response()
.with_record(_video_record().with_cursor(cursor_value_3))
.with_record(_video_record().with_cursor(cursor_value_2))
.with_record(_video_record().with_cursor(cursor_value_1))
.build(),
)
stream_state = {account_id_1: {_CURSOR_FIELD: cursor_value_2}, account_id_2: {_CURSOR_FIELD: cursor_value_2}}

View File

@@ -14,9 +14,7 @@ FB_API_VERSION = FacebookAdsApi.API_VERSION
class TestMyFacebookAdsApi:
@pytest.fixture
def fb_api(self):
return source_facebook_marketing.api.MyFacebookAdsApi.init(
access_token="foo", crash_log=False
)
return source_facebook_marketing.api.MyFacebookAdsApi.init(access_token="foo", crash_log=False)
@pytest.mark.parametrize(
"max_rate,max_pause_interval,min_pause_interval,usage,pause_interval,expected_pause_interval",
@@ -120,9 +118,7 @@ class TestMyFacebookAdsApi:
]
mock_parse_call_rate_header = mocker.Mock(side_effect=usages_pause_intervals)
mocker.patch.object(
fb_api, "_parse_call_rate_header", mock_parse_call_rate_header
)
mocker.patch.object(fb_api, "_parse_call_rate_header", mock_parse_call_rate_header)
mocker.patch.object(fb_api, "MIN_PAUSE_INTERVAL", min_pause_interval)
output = fb_api._get_max_usage_pause_interval_from_batch(records)
@@ -145,9 +141,7 @@ class TestMyFacebookAdsApi:
(["not_batch"], 2, 1, False),
],
)
def test__handle_call_rate_limit(
self, mocker, fb_api, params, min_rate, usage, expect_sleep
):
def test__handle_call_rate_limit(self, mocker, fb_api, params, min_rate, usage, expect_sleep):
pause_interval = 1
mock_response = mocker.Mock()
@@ -167,20 +161,12 @@ class TestMyFacebookAdsApi:
mocker.patch.object(source_facebook_marketing.api, "sleep")
assert fb_api._handle_call_rate_limit(mock_response, params) is None
if "batch" in params:
fb_api._get_max_usage_pause_interval_from_batch.assert_called_with(
mock_response.json.return_value
)
fb_api._get_max_usage_pause_interval_from_batch.assert_called_with(mock_response.json.return_value)
else:
fb_api._parse_call_rate_header.assert_called_with(
mock_response.headers.return_value
)
fb_api._parse_call_rate_header.assert_called_with(mock_response.headers.return_value)
if expect_sleep:
fb_api._compute_pause_interval.assert_called_with(
usage=usage, pause_interval=pause_interval
)
source_facebook_marketing.api.sleep.assert_called_with(
fb_api._compute_pause_interval.return_value.total_seconds()
)
fb_api._compute_pause_interval.assert_called_with(usage=usage, pause_interval=pause_interval)
source_facebook_marketing.api.sleep.assert_called_with(fb_api._compute_pause_interval.return_value.total_seconds())
source_facebook_marketing.api.logger.warning.assert_called_with(
f"Utilization is too high ({usage})%, pausing for {fb_api._compute_pause_interval.return_value}"
)

View File

@@ -267,9 +267,7 @@ class TestInsightAsyncJob:
kwargs["failure"](response)
def test_elapsed_time(self, job, api, adreport):
assert (
job.elapsed_time is None
), "should be None for the job that is not started"
assert job.elapsed_time is None, "should be None for the job that is not started"
job.start()
adreport["async_status"] = Status.COMPLETED.value
@@ -317,10 +315,7 @@ class TestInsightAsyncJob:
job_timeout=pendulum.duration(minutes=60),
)
assert (
str(job)
== f"InsightAsyncJob(id=<None>, {account}, time_range=<Period [2010-01-01 -> 2011-01-01]>, breakdowns=[10, 20])"
)
assert str(job) == f"InsightAsyncJob(id=<None>, {account}, time_range=<Period [2010-01-01 -> 2011-01-01]>, breakdowns=[10, 20])"
def test_get_result(self, job, adreport, api):
job.start()
@@ -375,9 +370,7 @@ class TestInsightAsyncJob:
def test_split_job(self, mocker, api, edge_class, next_edge_class, id_field):
"""Test that split will correctly downsize edge_object"""
today = pendulum.today().date()
start, end = today - pendulum.duration(
days=365 * 3 + 20
), today - pendulum.duration(days=365 * 3 + 10)
start, end = today - pendulum.duration(days=365 * 3 + 20), today - pendulum.duration(days=365 * 3 + 10)
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(
api=api,
@@ -404,9 +397,7 @@ class TestInsightAsyncJob:
# with the one 37 months ago, that's why current date is frozen.
# For a different date the since date would be also different.
# See facebook_marketing.utils.validate_start_date for reference
"since": (
today - pendulum.duration(months=37) + pendulum.duration(days=1)
).to_date_string(),
"since": (today - pendulum.duration(months=37) + pendulum.duration(days=1)).to_date_string(),
"until": end.to_date_string(),
},
}
@@ -415,16 +406,11 @@ class TestInsightAsyncJob:
assert all(j.interval == job.interval for j in small_jobs)
for i, small_job in enumerate(small_jobs, start=1):
assert small_job._params["time_range"] == job._params["time_range"]
assert (
str(small_job)
== f"InsightAsyncJob(id=<None>, {next_edge_class(i)}, time_range={job.interval}, breakdowns={[]})"
)
assert str(small_job) == f"InsightAsyncJob(id=<None>, {next_edge_class(i)}, time_range={job.interval}, breakdowns={[]})"
def test_split_job_smallest(self, mocker, api):
"""Test that split will correctly downsize edge_object"""
interval = pendulum.Period(
pendulum.Date(2010, 1, 1), pendulum.Date(2010, 1, 10)
)
interval = pendulum.Period(pendulum.Date(2010, 1, 1), pendulum.Date(2010, 1, 10))
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(
api=api,
@@ -434,9 +420,7 @@ class TestInsightAsyncJob:
job_timeout=pendulum.duration(minutes=60),
)
with pytest.raises(
ValueError, match="The job is already splitted to the smallest size."
):
with pytest.raises(ValueError, match="The job is already splitted to the smallest size."):
job.split_job()
@@ -511,9 +495,7 @@ class TestParentAsyncJob:
small_jobs = parent_job.split_job()
assert (
len(small_jobs) == len(grouped_jobs) + 5 - 2
), "each failed job must be replaced with its split"
assert len(small_jobs) == len(grouped_jobs) + 5 - 2, "each failed job must be replaced with its split"
for i, job in enumerate(grouped_jobs):
if i in (0, 5):
job.split_job.assert_called_once()
@@ -535,7 +517,4 @@ class TestParentAsyncJob:
count += 1
def test_str(self, parent_job, grouped_jobs):
assert (
str(parent_job)
== f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)"
)
assert str(parent_job) == f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)"

View File

@@ -25,17 +25,13 @@ def time_mock_fixture(mocker):
@pytest.fixture(name="update_job_mock")
def update_job_mock_fixture(mocker):
return mocker.patch(
"source_facebook_marketing.streams.async_job_manager.update_in_batch"
)
return mocker.patch("source_facebook_marketing.streams.async_job_manager.update_in_batch")
class TestInsightAsyncManager:
def test_jobs_empty(self, api, some_config):
"""Should work event without jobs"""
manager = InsightAsyncJobManager(
api=api, jobs=[], account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=[], account_id=some_config["account_ids"][0])
jobs = list(manager.completed_jobs())
assert not jobs
@@ -45,9 +41,7 @@ class TestInsightAsyncManager:
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False),
]
manager = InsightAsyncJobManager(
api=api, jobs=jobs, account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0])
completed_jobs = list(manager.completed_jobs())
assert jobs == completed_jobs
time_mock.sleep.assert_not_called()
@@ -64,16 +58,10 @@ class TestInsightAsyncManager:
update_job_mock.side_effect = update_job_behaviour()
jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False
),
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
]
manager = InsightAsyncJobManager(
api=api, jobs=jobs, account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0])
job = next(manager.completed_jobs(), None)
assert job == jobs[1]
@@ -81,9 +69,7 @@ class TestInsightAsyncManager:
job = next(manager.completed_jobs(), None)
assert job == jobs[0]
time_mock.sleep.assert_called_with(
InsightAsyncJobManager.JOB_STATUS_UPDATE_SLEEP_SECONDS
)
time_mock.sleep.assert_called_with(InsightAsyncJobManager.JOB_STATUS_UPDATE_SLEEP_SECONDS)
job = next(manager.completed_jobs(), None)
assert job is None
@@ -100,16 +86,10 @@ class TestInsightAsyncManager:
update_job_mock.side_effect = update_job_behaviour()
jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
]
manager = InsightAsyncJobManager(
api=api, jobs=jobs, account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0])
job = next(manager.completed_jobs(), None)
assert job == jobs[0]
@@ -131,27 +111,17 @@ class TestInsightAsyncManager:
update_job_mock.side_effect = update_job_behaviour()
jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
]
sub_jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
]
sub_jobs[0].get_result.return_value = [1, 2]
sub_jobs[1].get_result.return_value = [3, 4]
jobs[1].split_job.return_value = sub_jobs
manager = InsightAsyncJobManager(
api=api, jobs=jobs, account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0])
job = next(manager.completed_jobs(), None)
assert job == jobs[0]
@@ -164,9 +134,7 @@ class TestInsightAsyncManager:
job = next(manager.completed_jobs(), None)
assert job is None
def test_job_failed_too_many_times(
self, api, mocker, time_mock, update_job_mock, some_config
):
def test_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock, some_config):
"""Manager should fail when job failed too many times"""
def update_job_behaviour():
@@ -176,16 +144,10 @@ class TestInsightAsyncManager:
update_job_mock.side_effect = update_job_behaviour()
jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
]
manager = InsightAsyncJobManager(
api=api, jobs=jobs, account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0])
with pytest.raises(
JobException,
@@ -193,9 +155,7 @@ class TestInsightAsyncManager:
):
next(manager.completed_jobs(), None)
def test_nested_job_failed_too_many_times(
self, api, mocker, time_mock, update_job_mock, some_config
):
def test_nested_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock, some_config):
"""Manager should fail when a nested job within a ParentAsyncJob failed too many times"""
def update_job_behaviour():
@@ -206,17 +166,11 @@ class TestInsightAsyncManager:
update_job_mock.side_effect = update_job_behaviour()
sub_jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
]
jobs = [
mocker.Mock(
spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True
),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(
spec=ParentAsyncJob,
_jobs=sub_jobs,
@@ -225,9 +179,7 @@ class TestInsightAsyncManager:
completed=False,
),
]
manager = InsightAsyncJobManager(
api=api, jobs=jobs, account_id=some_config["account_ids"][0]
)
manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0])
with pytest.raises(JobException):
next(manager.completed_jobs(), None)

View File

@@ -36,18 +36,14 @@ def start_date_fixture():
@pytest.fixture(name="async_manager_mock")
def async_manager_mock_fixture(mocker):
mock = mocker.patch(
"source_facebook_marketing.streams.base_insight_streams.InsightAsyncJobManager"
)
mock = mocker.patch("source_facebook_marketing.streams.base_insight_streams.InsightAsyncJobManager")
mock.return_value = mock
return mock
@pytest.fixture(name="async_job_mock")
def async_job_mock_fixture(mocker):
mock = mocker.patch(
"source_facebook_marketing.streams.base_insight_streams.InsightAsyncJob"
)
mock = mocker.patch("source_facebook_marketing.streams.base_insight_streams.InsightAsyncJob")
mock.side_effect = lambda api, **kwargs: {"api": api, **kwargs}
@@ -101,9 +97,7 @@ class TestBaseInsightsStream:
"""
job = mocker.Mock(spec=InsightAsyncJob)
job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()]
job.interval = pendulum.Period(
pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1)
)
job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1))
stream = AdsInsights(
api=api,
account_ids=some_config["account_ids"],
@@ -131,9 +125,7 @@ class TestBaseInsightsStream:
"""
job = mocker.Mock(spec=AsyncJob)
job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()]
job.interval = pendulum.Period(
pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1)
)
job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1))
stream = AdsInsights(
api=api,
account_ids=some_config["account_ids"],
@@ -258,16 +250,12 @@ class TestBaseInsightsStream:
actual_state = stream.state
result_state = state if not result_state else result_state
result_state[some_config["account_ids"][0]]["slices"] = result_state[
some_config["account_ids"][0]
].get("slices", set())
result_state[some_config["account_ids"][0]]["slices"] = result_state[some_config["account_ids"][0]].get("slices", set())
result_state["time_increment"] = 1
assert actual_state == result_state
def test_stream_slices_no_state(
self, api, async_manager_mock, start_date, some_config
):
def test_stream_slices_no_state(self, api, async_manager_mock, start_date, some_config):
"""Stream will use start_date when there is not state"""
end_date = start_date + duration(weeks=2)
stream = AdsInsights(
@@ -279,9 +267,7 @@ class TestBaseInsightsStream:
)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
slices = list(
stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental)
)
slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
assert slices == [
{"account_id": "unknown_account", "insight_job": 1},
@@ -295,9 +281,7 @@ class TestBaseInsightsStream:
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)
def test_stream_slices_no_state_close_to_now(
self, api, async_manager_mock, recent_start_date, some_config
):
def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, recent_start_date, some_config):
"""Stream will use start_date when there is not state and start_date within 28d from now"""
start_date = recent_start_date
end_date = pendulum.now()
@@ -310,9 +294,7 @@ class TestBaseInsightsStream:
)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
slices = list(
stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental)
)
slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
assert slices == [
{"account_id": "unknown_account", "insight_job": 1},
@@ -326,9 +308,7 @@ class TestBaseInsightsStream:
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)
def test_stream_slices_with_state(
self, api, async_manager_mock, start_date, some_config
):
def test_stream_slices_with_state(self, api, async_manager_mock, start_date, some_config):
"""Stream will use cursor_value from state when there is state"""
end_date = start_date + duration(days=10)
cursor_value = start_date + duration(days=5)
@@ -342,9 +322,7 @@ class TestBaseInsightsStream:
)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
slices = list(
stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)
)
slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
assert slices == [
{"account_id": "unknown_account", "insight_job": 1},
@@ -355,16 +333,10 @@ class TestBaseInsightsStream:
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - cursor_value).days
assert generated_jobs[0].interval.start == cursor_value.date() + duration(
days=1
)
assert generated_jobs[1].interval.start == cursor_value.date() + duration(
days=2
)
assert generated_jobs[0].interval.start == cursor_value.date() + duration(days=1)
assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=2)
def test_stream_slices_with_state_close_to_now(
self, api, async_manager_mock, recent_start_date, some_config
):
def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, recent_start_date, some_config):
"""Stream will use start_date when close to now and start_date close to now"""
start_date = recent_start_date
end_date = pendulum.now()
@@ -379,9 +351,7 @@ class TestBaseInsightsStream:
)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
slices = list(
stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)
)
slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
assert slices == [
{"account_id": "unknown_account", "insight_job": 1},
@@ -396,9 +366,7 @@ class TestBaseInsightsStream:
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)
@pytest.mark.parametrize("state_format", ["old_format", "new_format"])
def test_stream_slices_with_state_and_slices(
self, api, async_manager_mock, start_date, some_config, state_format
):
def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, start_date, some_config, state_format):
"""Stream will use cursor_value from state, but will skip saved slices"""
end_date = start_date + duration(days=10)
cursor_value = start_date + duration(days=5)
@@ -430,9 +398,7 @@ class TestBaseInsightsStream:
)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]
slices = list(
stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)
)
slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
assert slices == [
{"account_id": "unknown_account", "insight_job": 1},
@@ -442,15 +408,9 @@ class TestBaseInsightsStream:
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert (
len(generated_jobs) == (end_date - cursor_value).days - 2
), "should be 2 slices short because of state"
assert generated_jobs[0].interval.start == cursor_value.date() + duration(
days=2
)
assert generated_jobs[1].interval.start == cursor_value.date() + duration(
days=4
)
assert len(generated_jobs) == (end_date - cursor_value).days - 2, "should be 2 slices short because of state"
assert generated_jobs[0].interval.start == cursor_value.date() + duration(days=2)
assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=4)
def test_get_json_schema(self, api, some_config):
stream = AdsInsights(
@@ -465,9 +425,7 @@ class TestBaseInsightsStream:
assert "device_platform" not in schema["properties"]
assert "country" not in schema["properties"]
assert not (
set(stream.fields()) - set(schema["properties"].keys())
), "all fields present in schema"
assert not (set(stream.fields()) - set(schema["properties"].keys())), "all fields present in schema"
def test_get_json_schema_custom(self, api, some_config):
stream = AdsInsights(
@@ -483,9 +441,7 @@ class TestBaseInsightsStream:
assert "device_platform" in schema["properties"]
assert "country" in schema["properties"]
assert not (
set(stream.fields()) - set(schema["properties"].keys())
), "all fields present in schema"
assert not (set(stream.fields()) - set(schema["properties"].keys())), "all fields present in schema"
def test_fields(self, api, some_config):
stream = AdsInsights(

View File

@@ -112,15 +112,11 @@ class ConcreteFBMarketingIncrementalStream(FBMarketingIncrementalStream):
@pytest.fixture
def incremental_class_instance(api):
return ConcreteFBMarketingIncrementalStream(
api=api, account_ids=["123", "456", "789"], start_date=None, end_date=None
)
return ConcreteFBMarketingIncrementalStream(api=api, account_ids=["123", "456", "789"], start_date=None, end_date=None)
class TestFBMarketingIncrementalStreamSliceAndState:
def test_stream_slices_multiple_accounts_with_state(
self, incremental_class_instance
):
def test_stream_slices_multiple_accounts_with_state(self, incremental_class_instance):
stream_state = {
"123": {"state_key": "state_value"},
"456": {"state_key": "another_state_value"},
@@ -130,14 +126,9 @@ class TestFBMarketingIncrementalStreamSliceAndState:
{"account_id": "456", "stream_state": {"state_key": "another_state_value"}},
{"account_id": "789", "stream_state": {}},
]
assert (
list(incremental_class_instance.stream_slices(stream_state))
== expected_slices
)
assert list(incremental_class_instance.stream_slices(stream_state)) == expected_slices
def test_stream_slices_multiple_accounts_empty_state(
self, incremental_class_instance
):
def test_stream_slices_multiple_accounts_empty_state(self, incremental_class_instance):
expected_slices = [
{"account_id": "123", "stream_state": {}},
{"account_id": "456", "stream_state": {}},
@@ -149,10 +140,7 @@ class TestFBMarketingIncrementalStreamSliceAndState:
incremental_class_instance._account_ids = ["123"]
stream_state = {"state_key": "state_value"}
expected_slices = [{"account_id": "123", "stream_state": stream_state}]
assert (
list(incremental_class_instance.stream_slices(stream_state))
== expected_slices
)
assert list(incremental_class_instance.stream_slices(stream_state)) == expected_slices
def test_stream_slices_single_account_empty_state(self, incremental_class_instance):
incremental_class_instance._account_ids = ["123"]
@@ -270,7 +258,5 @@ class TestFBMarketingIncrementalStreamSliceAndState:
# Set the instance's filter_statuses
incremental_class_instance._filter_statuses = instance_filter_statuses
new_state = incremental_class_instance.get_updated_state(
current_stream_state, latest_record
)
new_state = incremental_class_instance.get_updated_state(current_stream_state, latest_record)
assert new_state == expected_state

View File

@@ -28,11 +28,7 @@ def fb_call_rate_response_fixture():
"fbtrace_id": "this_is_fake_response",
}
headers = {
"x-app-usage": json.dumps(
{"call_count": 28, "total_time": 25, "total_cputime": 25}
)
}
headers = {"x-app-usage": json.dumps({"call_count": 28, "total_time": 25, "total_cputime": 25})}
return {
"json": {
@@ -59,9 +55,7 @@ def fb_call_amount_data_response_fixture():
class TestBackoff:
def test_limit_reached(
self, mocker, requests_mock, api, fb_call_rate_response, account_id, some_config
):
def test_limit_reached(self, mocker, requests_mock, api, fb_call_rate_response, account_id, some_config):
"""Error once, check that we retry and not fail"""
# turn Campaigns into non batch mode to test non batch logic
campaign_responses = [
@@ -111,9 +105,7 @@ class TestBackoff:
except FacebookRequestError:
pytest.fail("Call rate error has not being handled")
def test_batch_limit_reached(
self, requests_mock, api, fb_call_rate_response, account_id
):
def test_batch_limit_reached(self, requests_mock, api, fb_call_rate_response, account_id):
"""Error once, check that we retry and not fail"""
responses = [
fb_call_rate_response,
@@ -164,9 +156,7 @@ class TestBackoff:
FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/",
responses,
)
requests_mock.register_uri(
"POST", FacebookSession.GRAPH + f"/{FB_API_VERSION}/", batch_responses
)
requests_mock.register_uri("POST", FacebookSession.GRAPH + f"/{FB_API_VERSION}/", batch_responses)
stream = AdCreatives(api=api, account_ids=[account_id])
records = list(
@@ -244,9 +234,7 @@ class TestBackoff:
assert accounts == [account_data]
def test_limit_error_retry(
self, fb_call_amount_data_response, requests_mock, api, account_id
):
def test_limit_error_retry(self, fb_call_amount_data_response, requests_mock, api, account_id):
"""Error every time, check limit parameter decreases by 2 times every new call"""
res = requests_mock.register_uri(
@@ -368,13 +356,9 @@ class TestBackoff:
)
)
def test_limit_error_retry_next_page(
self, fb_call_amount_data_response, requests_mock, api, account_id
):
def test_limit_error_retry_next_page(self, fb_call_amount_data_response, requests_mock, api, account_id):
"""Unlike the previous test, this one tests the API call fail on the second or more page of a request."""
base_url = (
FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/advideos"
)
base_url = FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/advideos"
res = requests_mock.register_uri(
"GET",

View File

@@ -112,7 +112,6 @@ class TestMigrateIncludeDeletedToStatusFilters:
)
def test_migrate_config(self, old_config_path, new_config_path, include_deleted):
migration_instance = MigrateIncludeDeletedToStatusFilters()
original_config = load_config(old_config_path)
# migrate the test_config
migration_instance.migrate([CMD, "--config", old_config_path], SOURCE)
# load the updated config

View File

@@ -253,11 +253,7 @@ class TestRealErrors:
},
},
"status_code": 400,
"headers": {
"x-app-usage": json.dumps(
{"call_count": 28, "total_time": 25, "total_cputime": 25}
)
},
"headers": {"x-app-usage": json.dumps({"call_count": 28, "total_time": 25, "total_cputime": 25})},
},
),
(
@@ -306,14 +302,10 @@ class TestRealErrors:
),
],
)
def test_retryable_error(
self, some_config, requests_mock, name, retryable_error_response
):
def test_retryable_error(self, some_config, requests_mock, name, retryable_error_response):
"""Error once, check that we retry and not fail"""
requests_mock.reset_mock()
requests_mock.register_uri(
"GET", f"{act_url}", [retryable_error_response, ad_account_response]
)
requests_mock.register_uri("GET", f"{act_url}", [retryable_error_response, ad_account_response])
requests_mock.register_uri(
"GET",
f"{act_url}adcreatives",
@@ -333,17 +325,13 @@ class TestRealErrors:
assert ad_creative_records == ad_creative_data
@pytest.mark.parametrize("name, friendly_msg, config_error_response", CONFIG_ERRORS)
def test_config_error_during_account_info_read(
self, requests_mock, name, friendly_msg, config_error_response
):
def test_config_error_during_account_info_read(self, requests_mock, name, friendly_msg, config_error_response):
"""Error raised during account info read"""
api = API(access_token=some_config["access_token"], page_size=100)
stream = AdCreatives(api=api, account_ids=some_config["account_ids"])
requests_mock.register_uri(
"GET", f"{act_url}", [config_error_response, ad_account_response]
)
requests_mock.register_uri("GET", f"{act_url}", [config_error_response, ad_account_response])
try:
list(
stream.read_records(
@@ -360,9 +348,7 @@ class TestRealErrors:
# @pytest.mark.parametrize("name, friendly_msg, config_error_response", [CONFIG_ERRORS[-1]])
@pytest.mark.parametrize("name, friendly_msg, config_error_response", CONFIG_ERRORS)
def test_config_error_during_actual_nodes_read(
self, requests_mock, name, friendly_msg, config_error_response
):
def test_config_error_during_actual_nodes_read(self, requests_mock, name, friendly_msg, config_error_response):
"""Error raised during actual nodes read"""
api = API(access_token=some_config["access_token"], page_size=100)
@@ -389,9 +375,7 @@ class TestRealErrors:
assert friendly_msg in error.message
@pytest.mark.parametrize("name, friendly_msg, config_error_response", CONFIG_ERRORS)
def test_config_error_insights_account_info_read(
self, requests_mock, name, friendly_msg, config_error_response
):
def test_config_error_insights_account_info_read(self, requests_mock, name, friendly_msg, config_error_response):
"""Error raised during actual nodes read"""
api = API(access_token=some_config["access_token"], page_size=100)
@@ -403,30 +387,18 @@ class TestRealErrors:
fields=["account_id", "account_currency"],
insights_lookback_window=28,
)
requests_mock.register_uri(
"GET", f"{act_url}", [config_error_response, ad_account_response]
)
requests_mock.register_uri("GET", f"{act_url}", [config_error_response, ad_account_response])
try:
slice = list(
stream.stream_slices(sync_mode=SyncMode.full_refresh, stream_state={})
)[0]
list(
stream.read_records(
sync_mode=SyncMode.full_refresh, stream_slice=slice, stream_state={}
)
)
slice = list(stream.stream_slices(sync_mode=SyncMode.full_refresh, stream_state={}))[0]
list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice, stream_state={}))
assert False
except Exception as error:
assert isinstance(error, AirbyteTracedException)
assert error.failure_type == FailureType.config_error
assert friendly_msg in error.message
@pytest.mark.parametrize(
"name, friendly_msg, config_error_response", [CONFIG_ERRORS[0]]
)
def test_config_error_insights_during_actual_nodes_read(
self, requests_mock, name, friendly_msg, config_error_response
):
@pytest.mark.parametrize("name, friendly_msg, config_error_response", [CONFIG_ERRORS[0]])
def test_config_error_insights_during_actual_nodes_read(self, requests_mock, name, friendly_msg, config_error_response):
"""Error raised during actual nodes read"""
api = API(access_token=some_config["access_token"], page_size=100)
@@ -439,19 +411,11 @@ class TestRealErrors:
insights_lookback_window=28,
)
requests_mock.register_uri("GET", f"{act_url}", [ad_account_response])
requests_mock.register_uri(
"GET", f"{act_url}insights", [config_error_response, ad_creative_response]
)
requests_mock.register_uri("GET", f"{act_url}insights", [config_error_response, ad_creative_response])
try:
slice = list(
stream.stream_slices(sync_mode=SyncMode.full_refresh, stream_state={})
)[0]
list(
stream.read_records(
sync_mode=SyncMode.full_refresh, stream_slice=slice, stream_state={}
)
)
slice = list(stream.stream_slices(sync_mode=SyncMode.full_refresh, stream_state={}))[0]
list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice, stream_state={}))
assert False
except Exception as error:
assert isinstance(error, AirbyteTracedException)
@@ -502,25 +466,17 @@ class TestRealErrors:
"account_id": account_id,
"business": {"id": "1", "name": "TEST"},
}
requests_mock.register_uri(
"GET", f"{base_url}me/business_users", status_code=200, json=business_user
)
requests_mock.register_uri("GET", f"{base_url}me/business_users", status_code=200, json=business_user)
assigend_users = {"account_id": account_id, "tasks": ["TASK"]}
requests_mock.register_uri(
"GET", f"{act_url}assigned_users", status_code=200, json=assigend_users
)
requests_mock.register_uri("GET", f"{act_url}assigned_users", status_code=200, json=assigend_users)
success_response = {"status_code": 200, "json": {"account_id": account_id}}
requests_mock.register_uri(
"GET", f"{act_url}", [failure_response, success_response]
)
requests_mock.register_uri("GET", f"{act_url}", [failure_response, success_response])
record_gen = stream.read_records(
sync_mode=SyncMode.full_refresh,
stream_slice={"account_id": account_id},
stream_state={},
)
assert list(record_gen) == [
{"account_id": "unknown_account", "id": "act_unknown_account"}
]
assert list(record_gen) == [{"account_id": "unknown_account", "id": "act_unknown_account"}]

View File

@@ -87,9 +87,7 @@ class TestSourceFacebookMarketing:
assert ok
assert not error_msg
def test_check_connection_find_account_was_called(
self, api_find_account, config, logger_mock, fb_marketing
):
def test_check_connection_find_account_was_called(self, api_find_account, config, logger_mock, fb_marketing):
"""Check if _find_account was called to validate credentials"""
ok, error_msg = fb_marketing.check_connection(logger_mock, config=config)
@@ -103,9 +101,7 @@ class TestSourceFacebookMarketing:
assert ok
assert not error_msg
def test_check_connection_future_date_range(
self, api, config, logger_mock, fb_marketing
):
def test_check_connection_future_date_range(self, api, config, logger_mock, fb_marketing):
config["start_date"] = "2219-10-10T00:00:00"
config["end_date"] = "2219-10-11T00:00:00"
assert fb_marketing.check_connection(logger_mock, config=config) == (
@@ -113,9 +109,7 @@ class TestSourceFacebookMarketing:
"Date range can not be in the future.",
)
def test_check_connection_end_date_before_start_date(
self, api, config, logger_mock, fb_marketing
):
def test_check_connection_end_date_before_start_date(self, api, config, logger_mock, fb_marketing):
config["start_date"] = "2019-10-10T00:00:00"
config["end_date"] = "2019-10-09T00:00:00"
assert fb_marketing.check_connection(logger_mock, config=config) == (
@@ -130,9 +124,7 @@ class TestSourceFacebookMarketing:
assert not ok
assert error_msg
def test_check_connection_config_no_start_date(
self, api, config, logger_mock, fb_marketing
):
def test_check_connection_config_no_start_date(self, api, config, logger_mock, fb_marketing):
config.pop("start_date")
ok, error_msg = fb_marketing.check_connection(logger_mock, config=config)
@@ -169,9 +161,7 @@ class TestSourceFacebookMarketing:
config = ConnectorConfig.parse_obj(config)
assert fb_marketing.get_custom_insights_streams(api, config)
def test_get_custom_insights_action_breakdowns_allow_empty(
self, api, config, fb_marketing
):
def test_get_custom_insights_action_breakdowns_allow_empty(self, api, config, fb_marketing):
config["custom_insights"] = [
{
"name": "test",
@@ -182,9 +172,7 @@ class TestSourceFacebookMarketing:
]
config["action_breakdowns_allow_empty"] = False
streams = fb_marketing.get_custom_insights_streams(
api, ConnectorConfig.parse_obj(config)
)
streams = fb_marketing.get_custom_insights_streams(api, ConnectorConfig.parse_obj(config))
assert len(streams) == 1
assert streams[0].breakdowns == ["ad_format_asset"]
assert streams[0].action_breakdowns == [
@@ -194,9 +182,7 @@ class TestSourceFacebookMarketing:
]
config["action_breakdowns_allow_empty"] = True
streams = fb_marketing.get_custom_insights_streams(
api, ConnectorConfig.parse_obj(config)
)
streams = fb_marketing.get_custom_insights_streams(api, ConnectorConfig.parse_obj(config))
assert len(streams) == 1
assert streams[0].breakdowns == ["ad_format_asset"]
assert streams[0].action_breakdowns == []
@@ -223,13 +209,9 @@ class TestSourceFacebookMarketing:
def test_check_config(config_gen, requests_mock, fb_marketing):
requests_mock.register_uri(
"GET", FacebookSession.GRAPH + f"/{FacebookAdsApi.API_VERSION}/act_123/", {}
)
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FacebookAdsApi.API_VERSION}/act_123/", {})
assert command_check(fb_marketing, config_gen()) == AirbyteConnectionStatus(
status=Status.SUCCEEDED, message=None
)
assert command_check(fb_marketing, config_gen()) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None)
status = command_check(fb_marketing, config_gen(start_date="2019-99-10T00:00:00Z"))
assert status.status == Status.FAILED
@@ -240,9 +222,5 @@ def test_check_config(config_gen, requests_mock, fb_marketing):
status = command_check(fb_marketing, config_gen(start_date=...))
assert status.status == Status.SUCCEEDED
assert command_check(
fb_marketing, config_gen(end_date=...)
) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None)
assert command_check(
fb_marketing, config_gen(end_date="")
) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None)
assert command_check(fb_marketing, config_gen(end_date=...)) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None)
assert command_check(fb_marketing, config_gen(end_date="")) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None)

View File

@@ -23,12 +23,7 @@ from source_facebook_marketing.streams.streams import fetch_thumbnail_data_url
def test_filter_all_statuses(api, mocker, some_config):
mocker.patch.multiple(FBMarketingStream, __abstractmethods__=set())
expected = {}
assert (
FBMarketingStream(
api=api, account_ids=some_config["account_ids"]
)._filter_all_statuses()
== expected
)
assert FBMarketingStream(api=api, account_ids=some_config["account_ids"])._filter_all_statuses() == expected
expected = {
"filtering": [
@@ -76,9 +71,7 @@ def test_filter_all_statuses(api, mocker, some_config):
],
)
def test_fetch_thumbnail_data_url(url, requests_mock):
requests_mock.get(
url, status_code=200, headers={"content-type": "content-type"}, content=b""
)
requests_mock.get(url, status_code=200, headers={"content-type": "content-type"}, content=b"")
assert fetch_thumbnail_data_url(url) == "data:content-type;base64,"
@@ -122,9 +115,7 @@ def test_parse_call_rate_header():
],
],
)
def test_ads_insights_breakdowns(
class_name, breakdowns, action_breakdowns, some_config
):
def test_ads_insights_breakdowns(class_name, breakdowns, action_breakdowns, some_config):
kwargs = {
"api": None,
"account_ids": some_config["account_ids"],
@@ -145,9 +136,7 @@ def test_custom_ads_insights_breakdowns(some_config):
"end_date": pendulum.now(),
"insights_lookback_window": 1,
}
stream = AdsInsights(
breakdowns=["mmm"], action_breakdowns=["action_destination"], **kwargs
)
stream = AdsInsights(breakdowns=["mmm"], action_breakdowns=["action_destination"], **kwargs)
assert stream.breakdowns == ["mmm"]
assert stream.action_breakdowns == ["action_destination"]
@@ -159,12 +148,7 @@ def test_custom_ads_insights_breakdowns(some_config):
"action_destination",
]
stream = AdsInsights(
breakdowns=[],
action_breakdowns=[],
action_breakdowns_allow_empty=True,
**kwargs
)
stream = AdsInsights(breakdowns=[], action_breakdowns=[], action_breakdowns_allow_empty=True, **kwargs)
assert stream.breakdowns == []
assert stream.action_breakdowns == []

View File

@@ -23,18 +23,13 @@ TODAY = pendulum.local(2023, 3, 31)
"start_date",
pendulum.local(2019, 1, 1),
pendulum.local(2020, 3, 2),
[
f"The start date cannot be beyond 37 months from the current date. "
f"Set start date to {pendulum.local(2020, 3, 2)}."
],
[f"The start date cannot be beyond 37 months from the current date. " f"Set start date to {pendulum.local(2020, 3, 2)}."],
),
(
"start_date",
TODAY + pendulum.duration(months=1),
TODAY,
[
f"The start date cannot be in the future. Set start date to today's date - {TODAY}."
],
[f"The start date cannot be in the future. Set start date to today's date - {TODAY}."],
),
(
"end_date",

View File

@@ -0,0 +1,37 @@
# Facebook Marketing Migration Guide
## Upgrading to 2.0.0
Streams Ads-Insights-* streams now have updated schemas.
### Update Custom Insights Reports (this step can be skipped if you did not define any)
1. Select **Sources** in the main navbar.
1. Select the Facebook Marketing Connector.
2. Select the **Retest saved source**.
3. Remove unsupported fields from the list in Custom Insights section.
4. Select **Test and Save**.
### Refresh affected schemas and reset data
1. Select **Connections** in the main navbar.
1. Select the connection(s) affected by the update.
2. Select the **Replication** tab.
1. Select **Refresh source schema**.
2. Select **OK**.
```note
Any detected schema changes will be listed for your review.
```
3. Select **Save changes** at the bottom of the page.
1. Ensure the **Reset affected streams** option is checked.
```note
Depending on destination type you may not be prompted to reset your data.
```
4. Select **Save connection**.
```note
This will reset the data in your destination and initiate a fresh sync.
```
For more information on resetting your data in Airbyte, see [this page](https://docs.airbyte.com/operator-guides/reset).

View File

@@ -190,7 +190,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate
## Data type mapping
| Integration Type | Airbyte Type |
| :--------------: | :----------: |
|:----------------:|:------------:|
| string | string |
| number | number |
| array | array |
@@ -200,6 +200,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.0 | 2024-03-01 | [35746](https://github.com/airbytehq/airbyte/pull/35746) | Update API to `v19.0` |
| 1.4.2 | 2024-02-22 | [35539](https://github.com/airbytehq/airbyte/pull/35539) | Add missing config migration from `include_deleted` field |
| 1.4.1 | 2024-02-21 | [35467](https://github.com/airbytehq/airbyte/pull/35467) | Fix error with incorrect state transforming in the 1.4.0 version |
| 1.4.0 | 2024-02-20 | [32449](https://github.com/airbytehq/airbyte/pull/32449) | Replace "Include Deleted Campaigns, Ads, and AdSets" option in configuration with specific statuses selection per stream |