1
0
mirror of synced 2025-12-25 02:09:19 -05:00

fix(source-google-ads): handle ServiceUnavailableError with retries (#62494)

This commit is contained in:
Christo Grabowski
2025-07-03 12:18:06 -07:00
committed by GitHub
parent 7cd8dd2b78
commit adc91ad6d5
6 changed files with 120 additions and 7 deletions

View File

@@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerImageTag: 3.9.0-rc.2
dockerImageTag: 3.9.0-rc.3
dockerRepository: airbyte/source-google-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
githubIssueLabel: source-google-ads

View File

@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "3.9.0-rc.2"
version = "3.9.0-rc.3"
name = "source-google-ads"
description = "Source implementation for Google Ads."
authors = [ "Airbyte <contact@airbyte.io>",]

View File

@@ -9,7 +9,7 @@ from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping
import backoff
from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.v18.services.types.google_ads_service import GoogleAdsRow, SearchGoogleAdsResponse
from google.api_core.exceptions import InternalServerError, ServerError, TooManyRequests
from google.api_core.exceptions import InternalServerError, ServerError, ServiceUnavailable, TooManyRequests
from google.auth import exceptions
from google.protobuf import json_format
from google.protobuf.message import Message
@@ -26,7 +26,7 @@ API_VERSION = "v18"
def on_give_up(details):
error = details["exception"]
if isinstance(error, InternalServerError):
if isinstance(error, (InternalServerError, ServiceUnavailable)):
raise AirbyteTracedException(
failure_type=FailureType.transient_error,
message=f"{error.message} {error.details}",
@@ -72,9 +72,24 @@ class GoogleAds:
message = "The authentication to Google Ads has expired. Re-authenticate to restore access to Google Ads."
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) from e
def get_accessible_accounts(self):
@backoff.on_exception(
backoff.expo,
(InternalServerError, ServerError, ServiceUnavailable, TooManyRequests),
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
on_giveup=on_give_up,
max_tries=5,
)
def _get_accessible_customers(self):
"""Internal method to get accessible customers with retry logic"""
customer_resource_names = self.customer_service.list_accessible_customers().resource_names
logger.info(f"Found {len(customer_resource_names)} accessible accounts: {customer_resource_names}")
return customer_resource_names
def get_accessible_accounts(self):
"""Get accessible customer accounts with retry logic"""
customer_resource_names = self._get_accessible_customers()
for customer_resource_name in customer_resource_names:
customer_id = self.ga_service().parse_customer_path(customer_resource_name)["customer_id"]
@@ -82,7 +97,7 @@ class GoogleAds:
@backoff.on_exception(
backoff.expo,
(InternalServerError, ServerError, TooManyRequests),
(InternalServerError, ServerError, ServiceUnavailable, TooManyRequests),
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),

View File

@@ -13,6 +13,7 @@ from google.auth import exceptions
from source_google_ads.google_ads import GoogleAds
from source_google_ads.streams import chunk_date_range
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils import AirbyteTracedException
from .common import MockGoogleAdsClient, MockGoogleAdsService
@@ -70,6 +71,62 @@ def test_google_ads_wrong_permissions(mocker):
assert e.value.message == expected_message
def test_get_accessible_accounts_retry_on_service_unavailable(mocker):
"""Test that get_accessible_accounts retries on ServiceUnavailable errors"""
from google.api_core.exceptions import ServiceUnavailable
mocker.patch("time.sleep") # Mock sleep to speed up test
mocker.patch("source_google_ads.google_ads.GoogleAdsClient.load_from_dict", return_value=MockGoogleAdsClient(SAMPLE_CONFIG))
google_ads_client = GoogleAds(**SAMPLE_CONFIG)
# Mock the _get_accessible_customers method to fail first, then succeed
mock_customer_service = mocker.Mock()
mock_customer_service.list_accessible_customers.side_effect = [
ServiceUnavailable("Service is currently unavailable"), # First call fails
mocker.Mock(resource_names=["customers/123", "customers/456"]), # Second call succeeds
]
google_ads_client.customer_service = mock_customer_service
# Mock the ga_service to return a mock that can parse customer paths
mock_ga_service = mocker.Mock()
mock_ga_service.parse_customer_path.side_effect = [{"customer_id": "123"}, {"customer_id": "456"}]
google_ads_client.ga_services["default"] = mock_ga_service
# This should retry and eventually succeed
customer_ids = list(google_ads_client.get_accessible_accounts())
# Verify it was called twice (once failed, once succeeded)
assert mock_customer_service.list_accessible_customers.call_count == 2
assert customer_ids == ["123", "456"]
def test_get_accessible_accounts_gives_up_after_max_retries(mocker):
"""Test that get_accessible_accounts gives up after max retries on ServiceUnavailable"""
from google.api_core.exceptions import ServiceUnavailable
from airbyte_cdk.utils import AirbyteTracedException
mocker.patch("time.sleep") # Mock sleep to speed up test
mocker.patch("source_google_ads.google_ads.GoogleAdsClient.load_from_dict", return_value=MockGoogleAdsClient(SAMPLE_CONFIG))
google_ads_client = GoogleAds(**SAMPLE_CONFIG)
# Mock the customer service to always fail with ServiceUnavailable
mock_customer_service = mocker.Mock()
mock_customer_service.list_accessible_customers.side_effect = ServiceUnavailable("Service is currently unavailable")
google_ads_client.customer_service = mock_customer_service
# This should retry 5 times then give up
with pytest.raises(AirbyteTracedException) as e:
list(google_ads_client.get_accessible_accounts())
# Verify it was called 5 times (max retries)
assert mock_customer_service.list_accessible_customers.call_count == 5
assert "Service is currently unavailable" in e.value.message
assert e.value.failure_type == FailureType.transient_error
def test_send_request(mocker, customers):
mocker.patch("source_google_ads.google_ads.GoogleAdsClient.load_from_dict", return_value=MockGoogleAdsClient(SAMPLE_CONFIG))
mocker.patch("source_google_ads.google_ads.GoogleAdsClient.get_service", return_value=MockGoogleAdsService())

View File

@@ -9,7 +9,14 @@ import pytest
from google.ads.googleads.errors import GoogleAdsException
from google.ads.googleads.v18.errors.types.errors import ErrorCode, GoogleAdsError, GoogleAdsFailure
from google.ads.googleads.v18.errors.types.request_error import RequestErrorEnum
from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests, Unauthenticated
from google.api_core.exceptions import (
DataLoss,
InternalServerError,
ResourceExhausted,
ServiceUnavailable,
TooManyRequests,
Unauthenticated,
)
from grpc import RpcError
from source_google_ads.google_ads import GoogleAds
from source_google_ads.streams import AdGroup, ClickView, Customer, CustomerLabel
@@ -231,6 +238,39 @@ def test_retry_transient_errors(mocker, config, customers, error_cls):
assert records == []
def test_retry_503_raises_transient_error(mocker, config, customers):
customer_id = next(iter(customers)).id
mocker.patch("time.sleep")
credentials = config["credentials"]
credentials.update(use_proto_plus=True)
api = GoogleAds(credentials=credentials)
mocked_search = mocker.patch.object(
api.ga_services["default"], "search", side_effect=ServiceUnavailable("Service is currently unavailable")
)
incremental_stream_config = dict(
api=api,
conversion_window_days=config["conversion_window_days"],
start_date=config["start_date"],
end_date="2021-04-04",
customers=customers,
)
stream = ClickView(**incremental_stream_config)
stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04", "login_customer_id": "default"}
records = []
with pytest.raises(AirbyteTracedException) as exception:
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice))
assert exception.value.internal_message == (
"Service is currently unavailable Unable to fetch data from Google Ads API due to "
"temporal error on the Google Ads server. Please retry again later. "
)
assert exception.value.failure_type == FailureType.transient_error
assert mocked_search.call_count == 5
assert records == []
def test_retry_500_raises_transient_error(mocker, config, customers):
customer_id = next(iter(customers)).id

View File

@@ -335,6 +335,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan
| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.9.0-rc.3 | 2025-07-03 | [62494](https://github.com/airbytehq/airbyte/pull/62494) | Handle ServiceUnavailableErrors and retry on account check |
| 3.9.0-rc.2 | 2025-07-03 | [62505](https://github.com/airbytehq/airbyte/pull/62505) | Fix state migration for empty states |
| 3.9.0-rc.1 | 2025-06-18 | [61674](https://github.com/airbytehq/airbyte/pull/61674) | Migrate Campaign stream to Low Code |
| 3.8.2 | 2025-05-31 | [51664](https://github.com/airbytehq/airbyte/pull/51664) | Update dependencies |