1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source google ads: add more logs (#20755)

* #1148 source google ads: add more logs

* #1148 source google ads: upd changelog

* #1148 source google ads: flake fix

* #1148 source google ads - fix SATs

* #1148 source Google Ads: bump version

* #1148 source google ads: upd expected records

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Denys Davydov
2023-01-10 19:59:05 +02:00
committed by GitHub
parent 8ed66ebe79
commit 58352c99e1
12 changed files with 465 additions and 122 deletions

View File

@@ -635,7 +635,7 @@
- name: Google Ads
sourceDefinitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerRepository: airbyte/source-google-ads
dockerImageTag: 0.2.6
dockerImageTag: 0.2.7
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
icon: google-adwords.svg
sourceType: api

View File

@@ -5093,7 +5093,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-google-ads:0.2.6"
- dockerImage: "airbyte/source-google-ads:0.2.7"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-ads"
connectionSpecification:

View File

@@ -13,5 +13,5 @@ COPY main.py ./
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.name=airbyte/source-google-ads

View File

@@ -19,27 +19,39 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
expect_records:
bypass_reason: "Bypassed because SAT run on active account. Dedicated Sandbox is needed."
path: "integration_tests/expected_records.txt"
timeout_seconds: 600
empty_streams:
- name: "geographic_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "keyword_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "display_keyword_performance_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "display_topics_performance_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "shopping_performance_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "unhappytable"
bypass_reason: "The stream could not be fullfield with data manually."
bypass_reason: "Stream not filled yet."
- name: "click_view"
bypass_reason: "The stream could not be fullfield with data manually."
timeout_seconds: 1200
bypass_reason: "Stream not filled yet."
- name: "unhappytable"
bypass_reason: "Stream not filled yet."
- name: "shopping_performance_report"
bypass_reason: "Stream not filled yet."
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 1200
incremental:
bypass_reason: "Incremental tests are implemented using custom test, available by integration_tests/test_incremental.py"
tests:
- config_path: "secrets/incremental_config.json"
configured_catalog_path: "integration_tests/incremental_catalog.json"
threshold_days: 14
future_state:
future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:
account_performance_report: ["4651612872", "segments.date"]
click_view: ["4651612872", "segments.date"]
geographic_report: ["4651612872", "segments.date"]
keyword_report: ["4651612872", "segments.date"]
display_topics_performance_report: ["4651612872", "segments.date"]
shopping_performance_report: ["4651612872", "segments.date"]
ad_group_ads: ["4651612872", "segments.date"]
ad_groups: ["4651612872", "segments.date"]
accounts: ["4651612872", "segments.date"]
campaigns: ["4651612872", "segments.date"]
user_location_report: ["4651612872", "segments.date"]
ad_group_ad_report: ["4651612872", "segments.date"]
display_keyword_performance_report: ["4651612872", "segments.date"]

View File

@@ -0,0 +1,93 @@
[
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "account_performance_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "click_view" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "geographic_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "keyword_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "display_topics_performance_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "shopping_performance_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "ad_group_ads" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "ad_groups" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "accounts" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "campaigns" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "user_location_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "ad_group_ad_report" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "4651612872": { "segments.date": "2222-01-01" }},
"stream_descriptor": { "name": "display_keyword_performance_report" }
}
}
]

View File

@@ -169,6 +169,39 @@
"cursor_field": ["segments.date"],
"primary_key": [["campaign.id"], ["segments.date"]]
},
{
"stream": {
"name": "campaign_labels",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["campaign_label.resource_name"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["campaign_label.resource_name"]]
},
{
"stream": {
"name": "ad_group_labels",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["ad_group_label.resource_name"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["ad_group_label.resource_name"]]
},
{
"stream": {
"name": "ad_group_ad_labels",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["ad_group_ad_label.resource_name"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["ad_group_ad_label.resource_name"]]
},
{
"stream": {
"name": "user_location_report",
@@ -198,6 +231,15 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "ad_group_custom",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -1,16 +0,0 @@
{
"streams": [
{
"stream": {
"name": "ad_group_custom",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
}
]
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,173 @@
{
"streams": [
{
"stream": {
"name": "account_performance_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "click_view",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"source_defined_primary_key": [["click_view.gclid"], ["segments.date"]],
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"],
"primary_key": [["click_view.gclid"], ["segments.date"]]
},
{
"stream": {
"name": "geographic_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "keyword_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "display_topics_performance_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "shopping_performance_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "ad_group_ads",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"source_defined_primary_key": [
["ad_group_ad.ad.id"],
["segments.date"]
],
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"],
"primary_key": [["ad_group_ad.ad.id"], ["segments.date"]]
},
{
"stream": {
"name": "ad_groups",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"],
"source_defined_primary_key": [["ad_group.id"], ["segments.date"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"],
"primary_key": [["ad_group.id"], ["segments.date"]]
},
{
"stream": {
"name": "accounts",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"],
"source_defined_primary_key": [["customer.id"], ["segments.date"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"],
"primary_key": [["customer.id"], ["segments.date"]]
},
{
"stream": {
"name": "campaigns",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"],
"source_defined_primary_key": [["campaign.id"], ["segments.date"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"],
"primary_key": [["campaign.id"], ["segments.date"]]
},
{
"stream": {
"name": "user_location_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "ad_group_ad_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "display_keyword_performance_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
}
]
}

View File

@@ -1,87 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import logging
import pendulum
import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
from source_google_ads.source import SourceGoogleAds
@pytest.fixture
def configured_catalog():
return {
"streams": [
{
"stream": {
"name": "ad_group_ad_report",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": True,
"default_cursor_field": ["segments.date"],
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"],
}
]
}
GAP_DAYS = 14
def test_incremental_sync(config, configured_catalog):
google_ads_client = SourceGoogleAds()
records = list(google_ads_client.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog)))
latest_state = None
for record in records[::-1]:
if record and record.type == Type.STATE:
latest_state = record.state.data["ad_group_ad_report"][config["customer_id"]]["segments.date"]
break
for message in records:
if not message or message.type != Type.RECORD:
continue
cursor_value = message.record.data["segments.date"]
assert cursor_value <= latest_state
assert cursor_value >= pendulum.parse(config["start_date"]).subtract(days=GAP_DAYS).to_date_string()
# next sync
records = list(
google_ads_client.read(
logging.getLogger("airbyte"),
config,
ConfiguredAirbyteCatalog.parse_obj(configured_catalog),
{"ad_group_ad_report": {config["customer_id"]: {"segments.date": latest_state}}},
)
)
for record in records:
if record.type == Type.RECORD:
assert record.record.data["segments.date"] >= pendulum.parse(latest_state).subtract(days=GAP_DAYS).to_date_string()
if record.type == Type.STATE:
assert record.state.data["ad_group_ad_report"][config["customer_id"]]["segments.date"] >= latest_state
def test_abnormally_large_state(config, configured_catalog):
google_ads_client = SourceGoogleAds()
records = google_ads_client.read(
logging.getLogger("airbyte"),
config,
ConfiguredAirbyteCatalog.parse_obj(configured_catalog),
{"ad_group_ad_report": {"segments.date": "2222-06-04"}},
)
no_data_records = True
state_records = False
for record in records:
if record and record.type == Type.STATE:
state_records = True
if record and record.type == Type.RECORD:
no_data_records = False
assert no_data_records
assert state_records

View File

@@ -105,6 +105,7 @@ class GoogleAdsStream(Stream, ABC):
yield {"customer_id": customer.id}
def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
self.logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}")
if stream_slice is None:
return []
@@ -164,6 +165,7 @@ class IncrementalGoogleAdsStream(GoogleAdsStream, IncrementalMixin, ABC):
start_date = self._start_date
end_date = self._end_date
self.logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}")
for chunk in chunk_date_range(
start_date=start_date,
@@ -176,6 +178,7 @@ class IncrementalGoogleAdsStream(GoogleAdsStream, IncrementalMixin, ABC):
):
if chunk:
chunk["customer_id"] = customer.id
self.logger.info(f"Next slice is {chunk}")
yield chunk
def read_records(
@@ -186,6 +189,7 @@ class IncrementalGoogleAdsStream(GoogleAdsStream, IncrementalMixin, ABC):
and update `start_date` key in the `stream_slice` with the latest read record's cursor value, then retry the sync.
"""
while True:
self.logger.info("Starting a while loop iteration")
customer_id = stream_slice and stream_slice["customer_id"]
try:
records = super().read_records(sync_mode, stream_slice=stream_slice)
@@ -196,16 +200,20 @@ class IncrementalGoogleAdsStream(GoogleAdsStream, IncrementalMixin, ABC):
date_in_latest_record = pendulum.parse(record[self.cursor_field])
cursor_value = (max(date_in_current_stream, date_in_latest_record)).to_date_string()
self.state = {customer_id: {self.cursor_field: cursor_value}}
self.logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.")
yield record
continue
self.state = {customer_id: {self.cursor_field: record[self.cursor_field]}}
self.logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.")
yield record
continue
except GoogleAdsException as exception:
self.logger.info(f"Caught a GoogleAdsException: {str(exception)}")
error = next(iter(exception.failure.errors))
if error.error_code.request_error == RequestErrorEnum.RequestError.EXPIRED_PAGE_TOKEN:
start_date, end_date = parse_dates(stream_slice)
current_state = self.current_state(customer_id)
self.logger.info(f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}")
if (end_date - start_date).days == 1:
# If range days is 1, no need in retry, because it's the minimum date range
self.logger.error("Page token has expired.")
@@ -217,11 +225,13 @@ class IncrementalGoogleAdsStream(GoogleAdsStream, IncrementalMixin, ABC):
raise exception
# Retry reading records from where it crushed
stream_slice["start_date"] = self.current_state(customer_id, default=stream_slice["start_date"])
self.logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}")
else:
# raise caught error for other error statuses
raise exception
else:
# return the control if no exception is raised
self.logger.info("Current slice has been read. Exiting read_records()")
return
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:

View File

@@ -134,6 +134,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan
| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `0.2.7` | 2023-01-10 | [20755](https://github.com/airbytehq/airbyte/pull/20755) | Add more logs to debug stuck syncs |
| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors |
| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream |
| `0.2.4` | 2022-11-09 | [19208](https://github.com/airbytehq/airbyte/pull/19208) | Add TypeTransofrmer to Campaings stream to force proper type casting |