1
0
mirror of synced 2025-12-25 02:09:19 -05:00

feat(source-google-analytics): migrate to low-code (#60342)

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Serhii Lazebnyi
2025-06-06 12:42:43 +02:00
committed by GitHub
parent 108b82dff4
commit 952c8f3bbc
33 changed files with 4194 additions and 6662 deletions

View File

@@ -6,7 +6,7 @@
"name": "daily_active_users"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -17,7 +17,7 @@
"name": "weekly_active_users"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -28,7 +28,7 @@
"name": "four_weekly_active_users"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -39,7 +39,7 @@
"name": "devices"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -50,7 +50,7 @@
"name": "locations"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -61,7 +61,7 @@
"name": "pages"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -72,7 +72,7 @@
"name": "traffic_sources"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -83,7 +83,7 @@
"name": "website_overview"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -94,7 +94,7 @@
"name": "user_acquisition_first_user_medium_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -105,7 +105,7 @@
"name": "user_acquisition_first_user_source_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -116,7 +116,7 @@
"name": "user_acquisition_first_user_source_medium_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -127,7 +127,7 @@
"name": "user_acquisition_first_user_source_platform_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -138,7 +138,7 @@
"name": "user_acquisition_first_user_campaign_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -149,7 +149,7 @@
"name": "user_acquisition_first_user_google_ads_ad_network_type_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -160,7 +160,7 @@
"name": "user_acquisition_first_user_google_ads_ad_group_name_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -171,7 +171,7 @@
"name": "traffic_acquisition_session_source_medium_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -182,7 +182,7 @@
"name": "traffic_acquisition_session_medium_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -193,7 +193,7 @@
"name": "traffic_acquisition_session_source_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -204,7 +204,7 @@
"name": "traffic_acquisition_session_campaign_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -215,7 +215,7 @@
"name": "traffic_acquisition_session_default_channel_grouping_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -226,7 +226,7 @@
"name": "traffic_acquisition_session_source_platform_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -237,7 +237,7 @@
"name": "events_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -248,7 +248,7 @@
"name": "conversions_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -259,7 +259,7 @@
"name": "pages_title_and_screen_class_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -270,7 +270,7 @@
"name": "pages_path_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -281,7 +281,7 @@
"name": "pages_title_and_screen_name_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -292,7 +292,7 @@
"name": "content_group_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -303,7 +303,7 @@
"name": "ecommerce_purchases_item_id_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -314,7 +314,7 @@
"name": "demographic_country_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -325,7 +325,7 @@
"name": "demographic_region_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -336,7 +336,7 @@
"name": "demographic_city_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -347,7 +347,7 @@
"name": "demographic_language_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -358,7 +358,7 @@
"name": "tech_browser_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -369,7 +369,7 @@
"name": "tech_device_category_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -380,7 +380,7 @@
"name": "tech_device_model_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -391,7 +391,7 @@
"name": "tech_screen_resolution_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -402,7 +402,7 @@
"name": "tech_app_version_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -413,7 +413,7 @@
"name": "tech_platform_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -424,7 +424,7 @@
"name": "tech_platform_device_category_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -435,7 +435,7 @@
"name": "tech_operating_system_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -446,7 +446,7 @@
"name": "tech_os_with_version_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -457,7 +457,7 @@
"name": "demographic_interest_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -468,7 +468,7 @@
"name": "ecommerce_purchases_item_category_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -479,7 +479,7 @@
"name": "ecommerce_purchases_item_category_report_combined"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -490,7 +490,7 @@
"name": "ecommerce_purchases_item_category_3_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -501,7 +501,7 @@
"name": "publisher_ads_ad_unit_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -512,7 +512,7 @@
"name": "demographic_age_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -523,7 +523,7 @@
"name": "publisher_ads_ad_format_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -534,7 +534,7 @@
"name": "ecommerce_purchases_item_category_4_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -545,7 +545,7 @@
"name": "ecommerce_purchases_item_category_5_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -556,7 +556,7 @@
"name": "ecommerce_purchases_item_category_2_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -567,7 +567,7 @@
"name": "ecommerce_purchases_item_name_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -578,7 +578,7 @@
"name": "publisher_ads_page_path_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -589,7 +589,7 @@
"name": "demographic_gender_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -600,7 +600,7 @@
"name": "ecommerce_purchases_item_brand_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -611,7 +611,7 @@
"name": "publisher_ads_ad_source_report"
},
"stream_state": {
"date": "20990101"
"date": "2099-01-01"
}
}
},
@@ -622,7 +622,7 @@
"name": "weekly_events_report"
},
"stream_state": {
"yearWeek": "209915"
"yearWeek": "2099-01-05"
}
}
}

View File

@@ -12,7 +12,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
dockerImageTag: 2.7.7
dockerImageTag: 2.8.0-rc.1
dockerRepository: airbyte/source-google-analytics-data-api
documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-data-api
githubIssueLabel: source-google-analytics-data-api
@@ -31,6 +31,8 @@ data:
enabled: true
releaseStage: generally_available
releases:
rolloutConfiguration:
enableProgressiveRollout: true
breakingChanges:
2.0.0:
message: Version 2.0.0 introduces changes to stream names for those syncing more than one Google Analytics 4 property. It allows streams from all properties to sync successfully. Please upgrade the connector to enable this additional functionality.
@@ -49,7 +51,7 @@ data:
supportLevel: certified
tags:
- language:python
- cdk:python
- cdk:low-code
connectorTestSuitesOptions:
# Running regression and acceptance at the same time can cause quota consumption problems
# - suite: liveTests

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "2.7.7"
version = "2.8.0-rc.1"
name = "source-google-analytics-data-api"
description = "Source implementation for Google Analytics Data Api."
authors = [ "Airbyte <contact@airbyte.io>",]
@@ -17,11 +17,7 @@ include = "source_google_analytics_data_api"
[tool.poetry.dependencies]
python = "^3.10,<3.12"
cryptography = "==42.0.5"
requests = "==2.31.0"
airbyte-cdk = "^5.0.0"
PyJWT = "==2.8.0"
pandas = "^2.2.0"
airbyte-cdk = "^6"
[tool.poetry.scripts]
source-google-analytics-data-api = "source_google_analytics_data_api.run:run"

View File

@@ -5,4 +5,5 @@
from .source import SourceGoogleAnalyticsDataApi
__all__ = ["SourceGoogleAnalyticsDataApi"]

View File

@@ -1,196 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from functools import wraps
from typing import Any, Iterable, Mapping, Optional
import requests
from requests.exceptions import JSONDecodeError
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from .utils import API_LIMIT_PER_HOUR
class GoogleAnalyticsApiQuotaBase:
# Airbyte Logger
logger = logging.getLogger("airbyte")
# initial quota placeholder
initial_quota: Optional[Mapping[str, Any]] = None
# the % value cutoff, crossing which will trigger
# setting the scenario values for attrs prior to the 429 error
treshold: float = 0.1
# base attrs
response_action: ResponseAction = ResponseAction.RETRY
backoff_time: Optional[int] = None
# stop making new slices globally
stop_iter: bool = False
error_message = None
# mapping with scenarios for each quota kind
quota_mapping: Mapping[str, Any] = {
"concurrentRequests": {
"error_pattern": "Exhausted concurrent requests quota.",
"backoff": 30,
"response_action": ResponseAction.RETRY,
"stop_iter": False,
},
"tokensPerProjectPerHour": {
"error_pattern": "Exhausted property tokens for a project per hour.",
"backoff": 1800,
"response_action": ResponseAction.RETRY,
"stop_iter": False,
"error_message": API_LIMIT_PER_HOUR,
},
"potentiallyThresholdedRequestsPerHour": {
"error_pattern": "Exhausted potentially thresholded requests quota.",
"backoff": 1800,
"response_action": ResponseAction.RETRY,
"stop_iter": False,
"error_message": API_LIMIT_PER_HOUR,
},
# TODO: The next scenarios are commented out for now.
# When we face with one of these at least 1 time,
# we should be able to uncomment the one matches the criteria
# and fill-in the `error_pattern` to track that quota as well.
# IMPORTANT: PLEASE DO NOT REMOVE the scenario down bellow!
# 'tokensPerDay': {
# 'error_pattern': "___",
# "backoff": None,
# "response_action": ResponseAction.FAIL,
# "stop_iter": True,
# },
# 'tokensPerHour': {
# 'error_pattern': "___",
# "backoff": 1800,
# "response_action": ResponseAction.RETRY,
# "stop_iter": False,
# },
# 'serverErrorsPerProjectPerHour': {
# 'error_pattern': "___",
# "backoff": 3600,
# "response_action": ResponseAction.RETRY,
# "stop_iter": False,
# },
}
def _get_known_quota_list(self) -> Iterable[str]:
return self.quota_mapping.keys()
def _get_initial_quota_value(self, quota_name: str) -> int:
init_remaining = self.initial_quota.get(quota_name).get("remaining")
# before the 429 is hit the `remaining` could become -1 or 0
return 1 if init_remaining <= 0 else init_remaining
def _get_quota_name_from_error_message(self, error_msg: str) -> Optional[str]:
for quota, value in self.quota_mapping.items():
if value.get("error_pattern") in error_msg:
return quota
return None
def _get_known_quota_from_response(self, property_quota: Mapping[str, Any]) -> Mapping[str, Any]:
current_quota = {}
for quota in property_quota.keys():
if quota in self._get_known_quota_list():
current_quota.update(**{quota: property_quota.get(quota)})
return current_quota
def _set_retry_attrs_for_quota(self, quota_name: str) -> None:
quota = self.quota_mapping.get(quota_name, {})
if quota:
self.response_action = quota.get("response_action")
self.stop_iter = quota.get("stop_iter")
self.backoff_time = quota.get("backoff")
self.error_message = quota.get("error_message", quota.get("error_pattern"))
def _set_default_handle_error_attrs(self) -> None:
self.response_action = ResponseAction.RETRY
self.backoff_time = None
self.stop_iter = False
def _set_initial_quota(self, current_quota: Optional[Mapping[str, Any]] = None) -> None:
if not self.initial_quota:
self.initial_quota = current_quota
def _check_remaining_quota(self, current_quota: Mapping[str, Any]) -> None:
for quota_name, quota_value in current_quota.items():
total_available = self._get_initial_quota_value(quota_name)
remaining: int = quota_value.get("remaining")
remaining_percent: float = remaining / total_available
# make an early stop if we faced with the quota that is going to run out
if remaining_percent <= self.treshold:
self.logger.warning(f"The `{quota_name}` quota is running out of tokens. Available {remaining} out of {total_available}.")
self._set_retry_attrs_for_quota(quota_name)
return None
elif self.error_message:
self.logger.warning(self.error_message)
def _check_for_errors(self, response: requests.Response) -> None:
try:
# revert to default values after successul retry
self._set_default_handle_error_attrs()
error = response.json().get("error")
if error:
quota_name = self._get_quota_name_from_error_message(error.get("message"))
if quota_name:
self._set_retry_attrs_for_quota(quota_name)
self.logger.warn(f"The `{quota_name}` quota is exceeded!")
return None
except (AttributeError, JSONDecodeError) as attr_e:
self.logger.warning(
f"`GoogleAnalyticsApiQuota._check_for_errors`: Received non JSON response from the API. Full error: {attr_e}. Bypassing."
)
pass
except Exception as e:
self.logger.fatal(f"Other `GoogleAnalyticsApiQuota` error: {e}")
raise
class GoogleAnalyticsApiQuota(GoogleAnalyticsApiQuotaBase):
def _check_quota(self, response: requests.Response):
# try get json from response
try:
parsed_response = response.json()
except (AttributeError, JSONDecodeError) as e:
self.logger.warning(
f"`GoogleAnalyticsApiQuota._check_quota`: Received non JSON response from the API. Full error: {e}. Bypassing."
)
parsed_response = {}
# get current quota
property_quota: dict = parsed_response.get("propertyQuota")
if property_quota:
# return default attrs values once successfully retried
# or until another 429 error is hit
self._set_default_handle_error_attrs()
# reduce quota list to known kinds only
current_quota = self._get_known_quota_from_response(property_quota)
if current_quota:
# save the initial quota
self._set_initial_quota(current_quota)
# check for remaining quota
self._check_remaining_quota(current_quota)
else:
self._check_for_errors(response)
def handle_quota(self) -> None:
"""
The function decorator is used to integrate with the `interpret_response` method,
or any other method that provides early access to the `response` object.
"""
def decorator(func):
@wraps(func)
def wrapper_handle_quota(*args, **kwargs):
# find the requests.Response inside args list
for arg in args:
response = arg if isinstance(arg, requests.models.Response) else None
# check for the quota
self._check_quota(response)
# return actual function
return func(*args, **kwargs)
return wrapper_handle_quota
return decorator

View File

@@ -1,72 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
import jwt
import requests
from source_google_analytics_data_api import utils
class GoogleServiceKeyAuthenticator(requests.auth.AuthBase):
_google_oauth2_token_endpoint = "https://oauth2.googleapis.com/token"
_google_oauth2_scope_endpoint = "https://www.googleapis.com/auth/analytics.readonly"
_google_oauth2_grant_type_urn = "urn:ietf:params:oauth:grant-type:jwt-bearer"
_default_token_lifetime_secs = 3600
_jwt_encode_algorithm = "RS256"
def __init__(self, credentials: dict):
self._client_email = credentials["client_email"]
self._client_secret = credentials["private_key"]
self._client_id = credentials["client_id"]
self._token: dict = {}
def _get_claims(self) -> dict:
now = datetime.datetime.utcnow()
expiry = now + datetime.timedelta(seconds=self._default_token_lifetime_secs)
return {
"iss": self._client_email,
"scope": self._google_oauth2_scope_endpoint,
"aud": self._google_oauth2_token_endpoint,
"exp": utils.datetime_to_secs(expiry),
"iat": utils.datetime_to_secs(now),
}
def _get_headers(self):
headers = {}
if self._client_id:
headers["kid"] = self._client_id
return headers
def _get_signed_payload(self) -> dict:
claims = self._get_claims()
headers = self._get_headers()
assertion = jwt.encode(claims, self._client_secret, headers=headers, algorithm=self._jwt_encode_algorithm)
return {"grant_type": self._google_oauth2_grant_type_urn, "assertion": str(assertion)}
def _token_expired(self):
if not self._token:
return True
return self._token["expires_at"] < utils.datetime_to_secs(datetime.datetime.utcnow())
def _rotate(self):
if self._token_expired():
try:
response = requests.request(method="POST", url=self._google_oauth2_token_endpoint, params=self._get_signed_payload()).json()
except requests.exceptions.RequestException as e:
raise Exception(f"Error refreshing access token: {e}") from e
self._token = dict(
**response,
expires_at=utils.datetime_to_secs(datetime.datetime.utcnow() + datetime.timedelta(seconds=response["expires_in"])),
)
def __call__(self, r: requests.Request) -> requests.Request:
self._rotate()
r.headers["Authorization"] = f"Bearer {self._token['access_token']}"
return r

View File

@@ -0,0 +1,69 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from itertools import islice
from typing import Any, Iterable, List, MutableMapping
import requests
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
@dataclass
class CombinedExtractor(RecordExtractor):
"""
Extractor that merges the output of multiple sub-extractors into a single record.
This extractor takes a list of `RecordExtractor` instances (`extractors`), each of which
independently extracts records from the response. For each response, the extractor:
1. Invokes each sub-extractor to generate iterables of records.
2. Zips the results together, so that the first record from each extractor is combined,
the second from each, and so on.
3. Merges each group of records into a single dictionary using `dict.update()`.
The result is a sequence of dictionaries where each dictionary contains the merged keys
and values from the corresponding records across all extractors.
Example:
keys_extractor -> yields: [{"name": "Alice", "age": 30}]
extra_data_extractor -> yields: [{"country": "US"}]
CombinedExtractor(extractors=[keys_extractor, extra_data_extractor]) ->
yields: [{"name": "Alice", "age": 30, "country": "US"}]
"""
extractors: List[RecordExtractor]
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
extractors_records = [extractor.extract_records(response) for extractor in self.extractors]
for records in zip(*extractors_records):
merged = {}
for record in records:
merged.update(record) # merge all fields
yield merged
@dataclass
class KeyValueExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.
The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.
Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.
Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""
keys_extractor: RecordExtractor
values_extractor: RecordExtractor
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = self.keys_extractor.extract_records(response)
values = self.values_extractor.extract_records(response)
yield dict(zip(keys, values))

View File

@@ -1,240 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, List, Mapping
import dpath.util
import orjson
from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import AirbyteMessageSerializer
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from .source import SourceGoogleAnalyticsDataApi
logger = logging.getLogger("airbyte_logger")
class MigratePropertyID:
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.
Specifically, starting from `1.3.0`, the `property_id` property should be like :
> List(["<property_id 1>", "<property_id 2>", ..., "<property_id n>"])
instead of, in `1.2.0`:
> JSON STR: "<property_id>"
"""
message_repository: MessageRepository = InMemoryMessageRepository()
migrate_from_key: str = "property_id"
migrate_to_key: str = "property_ids"
@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config requires migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
if cls.migrate_from_key in config:
return True
return False
@classmethod
def _transform_to_array(cls, config: Mapping[str, Any], source: SourceGoogleAnalyticsDataApi = None) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
config[cls.migrate_to_key] = config[cls.migrate_to_key] if cls.migrate_to_key in config else []
data = config.pop(cls.migrate_from_key)
if data not in config[cls.migrate_to_key]:
config[cls.migrate_to_key].append(data)
return config
@classmethod
def _modify_and_save(cls, config_path: str, source: SourceGoogleAnalyticsDataApi, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_to_array(config, source)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config
@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository.consume_queue():
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
@classmethod
def migrate(cls, args: List[str], source: SourceGoogleAnalyticsDataApi) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)
class MigrateCustomReports:
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.
Specifically, starting from `1.3.3`, the `custom_reports` property should be like :
> List([{name: my_report}, {dimensions: [a,b,c]}], [], ...)
instead of, in `1.3.2`:
> JSON STR: "{name: my_report}, {dimensions: [a,b,c]}"
"""
message_repository: MessageRepository = InMemoryMessageRepository()
migrate_from_key: str = "custom_reports"
migrate_to_key: str = "custom_reports_array"
@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether the config should be migrated to have the new structure for the `custom_reports`,
based on the source spec.
Returns:
> True, if the transformation is necessary
> False, otherwise.
> Raises the Exception if the structure could not be migrated.
"""
# If the config has been migrated and has entries, no need to migrate again.
if config.get(cls.migrate_to_key, []):
return False
# If the old config key is present and its value is a string, migration is needed.
if config.get(cls.migrate_from_key, None) is not None and isinstance(config[cls.migrate_from_key], str):
return True
return False
@classmethod
def _transform_to_array(cls, config: Mapping[str, Any], source: SourceGoogleAnalyticsDataApi = None) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
config[cls.migrate_to_key] = config[cls.migrate_from_key]
# transform `json_str` to `list` of objects
return source._validate_custom_reports(config)
@classmethod
def _modify_and_save(cls, config_path: str, source: SourceGoogleAnalyticsDataApi, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_to_array(config, source)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config
@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository.consume_queue():
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
@classmethod
def migrate(cls, args: List[str], source: SourceGoogleAnalyticsDataApi) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)
class MigrateCustomReportsCohortSpec:
"""
This class stands for migrating the config at runtime,
Specifically, starting from `2.1.0`; the `cohortSpec` property will be added tp `custom_reports_array` with flag `enabled`:
> List([{name: my_report, "cohortSpec": { "enabled": "true" } }, ...])
"""
message_repository: MessageRepository = InMemoryMessageRepository()
@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether the config should be migrated to have the new structure for the `cohortSpec` inside `custom_reports`,
based on the source spec.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
return not dpath.util.search(config, "custom_reports_array/**/cohortSpec/enabled")
@classmethod
def _transform_custom_reports_cohort_spec(
cls,
config: Mapping[str, Any],
) -> Mapping[str, Any]:
"""Assign `enabled` property that will be used within the new version"""
for report in config.get("custom_reports_array", []):
if report.get("cohortSpec"):
report["cohortSpec"]["enabled"] = "true"
else:
report.setdefault("cohortSpec", {})["enabled"] = "false"
return config
@classmethod
def _modify_and_save(cls, config_path: str, source: SourceGoogleAnalyticsDataApi, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_custom_reports_cohort_spec(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config
@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository.consume_queue():
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
@classmethod
def migrate(cls, args: List[str], source: SourceGoogleAnalyticsDataApi) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)

View File

@@ -1,138 +0,0 @@
{
"type": "array",
"items": {
"type": "object",
"required": ["name", "dimensions", "metrics"],
"properties": {
"name": {
"type": "string"
},
"dimensions": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
},
"metrics": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
},
"cohortSpec": {
"type": ["null", "object"],
"properties": {
"cohorts": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"dimension": {
"type": "string",
"enum": ["firstSessionDate"]
},
"dateRange": {
"type": "object",
"properties": {
"startDate": {
"type": ["null", "string"]
},
"endDate": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
}
}
}
}
}
},
"cohortsRange": {
"type": "object",
"required": ["granularity", "endOffset"],
"properties": {
"granularity": {
"type": "string",
"enum": ["DAILY", "WEEKLY", "MONTHLY"]
},
"startOffset": {
"type": ["null", "integer"]
},
"endOffset": {
"type": "integer"
}
}
},
"cohortReportSettings": {
"type": ["null", "object"],
"properties": {
"accumulate": {
"type": ["null", "boolean"]
}
}
}
}
},
"pivots": {
"type": ["null", "array"],
"items": {
"type": "object",
"required": ["limit"],
"properties": {
"fieldNames": {
"type": ["null", "array"],
"items": {
"type": "string"
}
},
"orderBys": {
"type": ["null", "array"],
"items": {
"type": "object",
"properties": {
"desc": {
"type": ["null", "boolean"]
},
"pivot": {
"type": "object",
"properties": {
"metricName": {
"type": "string"
},
"pivotSelections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"dimensionName": {
"type": "string"
},
"dimensionValue": {
"type": "string"
}
}
}
}
}
}
}
}
},
"offset": { "type": ["null", "string", "integer"] },
"limit": { "type": ["string", "integer"] },
"metricAggregations": {
"type": ["null", "string"],
"enum": ["COUNT", "TOTAL", "MAXIMUM", "MINIMUM"]
}
}
}
}
}
}
}

View File

@@ -1,689 +0,0 @@
[
{
"name": "daily_active_users",
"dimensions": ["date"],
"metrics": ["active1DayUsers"]
},
{
"name": "weekly_active_users",
"dimensions": ["date"],
"metrics": ["active7DayUsers"]
},
{
"name": "four_weekly_active_users",
"dimensions": ["date"],
"metrics": ["active28DayUsers"]
},
{
"name": "devices",
"dimensions": ["date", "deviceCategory", "operatingSystem", "browser"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate"
]
},
{
"name": "locations",
"dimensions": ["region", "country", "city", "date"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate"
]
},
{
"name": "pages",
"dimensions": ["date", "hostName", "pagePathPlusQueryString"],
"metrics": ["screenPageViews", "bounceRate"]
},
{
"name": "traffic_sources",
"dimensions": ["date", "sessionSource", "sessionMedium"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate"
]
},
{
"name": "website_overview",
"dimensions": ["date"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate"
]
},
{
"name": "user_acquisition_first_user_medium_report",
"dimensions": ["date", "firstUserMedium"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "user_acquisition_first_user_source_report",
"dimensions": ["date", "firstUserSource"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "user_acquisition_first_user_source_medium_report",
"dimensions": ["date", "firstUserSource", "firstUserMedium"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "user_acquisition_first_user_source_platform_report",
"dimensions": ["date", "firstUserSourcePlatform"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "user_acquisition_first_user_campaign_report",
"dimensions": ["date", "firstUserCampaignName"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "user_acquisition_first_user_google_ads_ad_network_type_report",
"dimensions": ["date", "firstUserGoogleAdsAdNetworkType"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "user_acquisition_first_user_google_ads_ad_group_name_report",
"dimensions": ["date", "firstUserGoogleAdsAdGroupName"],
"metrics": [
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"totalUsers",
"userEngagementDuration"
]
},
{
"name": "traffic_acquisition_session_source_medium_report",
"dimensions": ["date", "sessionSource", "sessionMedium"],
"metrics": [
"totalUsers",
"sessions",
"engagedSessions",
"eventsPerSession",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "traffic_acquisition_session_medium_report",
"dimensions": ["date", "sessionMedium"],
"metrics": [
"totalUsers",
"sessions",
"engagedSessions",
"eventsPerSession",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "traffic_acquisition_session_source_report",
"dimensions": ["date", "sessionSource"],
"metrics": [
"totalUsers",
"sessions",
"engagedSessions",
"eventsPerSession",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "traffic_acquisition_session_campaign_report",
"dimensions": ["date", "sessionCampaignName"],
"metrics": [
"totalUsers",
"sessions",
"engagedSessions",
"eventsPerSession",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "traffic_acquisition_session_default_channel_grouping_report",
"dimensions": ["date", "sessionDefaultChannelGrouping"],
"metrics": [
"totalUsers",
"sessions",
"engagedSessions",
"eventsPerSession",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "traffic_acquisition_session_source_platform_report",
"dimensions": ["date", "sessionSourcePlatform"],
"metrics": [
"totalUsers",
"sessions",
"engagedSessions",
"eventsPerSession",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "events_report",
"dimensions": ["date", "eventName"],
"metrics": ["eventCount", "totalUsers", "eventCountPerUser", "totalRevenue"]
},
{
"name": "weekly_events_report",
"dimensions": ["yearWeek", "eventName"],
"metrics": ["eventCount", "totalUsers", "eventCountPerUser", "totalRevenue"]
},
{
"name": "conversions_report",
"dimensions": ["date", "eventName"],
"metrics": ["conversions", "totalUsers", "totalRevenue"]
},
{
"name": "pages_title_and_screen_class_report",
"dimensions": ["date", "unifiedScreenClass"],
"metrics": [
"screenPageViews",
"totalUsers",
"newUsers",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "pages_path_report",
"dimensions": ["date", "pagePath"],
"metrics": [
"screenPageViews",
"totalUsers",
"newUsers",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "pages_title_and_screen_name_report",
"dimensions": ["date", "unifiedScreenName"],
"metrics": [
"screenPageViews",
"totalUsers",
"newUsers",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "content_group_report",
"dimensions": ["date", "contentGroup"],
"metrics": [
"screenPageViews",
"totalUsers",
"newUsers",
"eventCount",
"conversions",
"totalRevenue",
"userEngagementDuration"
]
},
{
"name": "ecommerce_purchases_item_name_report",
"dimensions": ["date", "itemName"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_id_report",
"dimensions": ["date", "itemId"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_category_report_combined",
"dimensions": [
"date",
"itemCategory",
"itemCategory2",
"itemCategory3",
"itemCategory4",
"itemCategory5"
],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_category_report",
"dimensions": ["date", "itemCategory"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_category_2_report",
"dimensions": ["date", "itemCategory2"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_category_3_report",
"dimensions": ["date", "itemCategory3"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_category_4_report",
"dimensions": ["date", "itemCategory4"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_category_5_report",
"dimensions": ["date", "itemCategory5"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "ecommerce_purchases_item_brand_report",
"dimensions": ["date", "itemBrand"],
"metrics": [
"cartToViewRate",
"purchaseToViewRate",
"itemsPurchased",
"itemRevenue",
"itemsAddedToCart",
"itemsViewed"
]
},
{
"name": "publisher_ads_ad_unit_report",
"dimensions": ["date", "adUnitName"],
"metrics": [
"publisherAdImpressions",
"adUnitExposure",
"publisherAdClicks",
"totalAdRevenue"
]
},
{
"name": "publisher_ads_page_path_report",
"dimensions": ["date", "pagePath"],
"metrics": [
"publisherAdImpressions",
"adUnitExposure",
"publisherAdClicks",
"totalAdRevenue"
]
},
{
"name": "publisher_ads_ad_format_report",
"dimensions": ["date", "adFormat"],
"metrics": [
"publisherAdImpressions",
"adUnitExposure",
"publisherAdClicks",
"totalAdRevenue"
]
},
{
"name": "publisher_ads_ad_source_report",
"dimensions": ["date", "adSourceName"],
"metrics": [
"publisherAdImpressions",
"adUnitExposure",
"publisherAdClicks",
"totalAdRevenue"
]
},
{
"name": "demographic_country_report",
"dimensions": ["date", "country"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "demographic_region_report",
"dimensions": ["date", "region"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "demographic_city_report",
"dimensions": ["date", "city"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "demographic_language_report",
"dimensions": ["date", "language"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "demographic_age_report",
"dimensions": ["date", "userAgeBracket"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "demographic_gender_report",
"dimensions": ["date", "userGender"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "demographic_interest_report",
"dimensions": ["date", "brandingInterest"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_browser_report",
"dimensions": ["date", "browser"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_device_category_report",
"dimensions": ["date", "deviceCategory"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_device_model_report",
"dimensions": ["date", "deviceModel"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_screen_resolution_report",
"dimensions": ["date", "screenResolution"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_app_version_report",
"dimensions": ["date", "appVersion"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_platform_report",
"dimensions": ["date", "platform"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_platform_device_category_report",
"dimensions": ["date", "platform", "deviceCategory"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_operating_system_report",
"dimensions": ["date", "operatingSystem"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
},
{
"name": "tech_os_with_version_report",
"dimensions": ["date", "operatingSystemWithVersion"],
"metrics": [
"totalUsers",
"newUsers",
"engagedSessions",
"engagementRate",
"eventCount",
"conversions",
"totalRevenue"
]
}
]

View File

@@ -1,26 +0,0 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from typing import Mapping, Type, Union
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution
from source_google_analytics_data_api.utils import WRONG_CUSTOM_REPORT_CONFIG
def get_google_analytics_data_api_base_error_mapping(report_name) -> Mapping[Union[int, str, Type[Exception]], ErrorResolution]:
"""
Updating base default error messages friendly config error message that includes the steam report name
"""
stream_error_mapping = {}
for error_key, base_error_resolution in DEFAULT_ERROR_MAPPING.items():
if base_error_resolution.failure_type in (FailureType.config_error, FailureType.system_error):
stream_error_mapping[error_key] = ErrorResolution(
response_action=base_error_resolution.response_action,
failure_type=FailureType.config_error,
error_message=WRONG_CUSTOM_REPORT_CONFIG.format(report=report_name),
)
else:
stream_error_mapping[error_key] = base_error_resolution
return stream_error_mapping

View File

@@ -1,30 +0,0 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from typing import Mapping, Type, Union
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction
PROPERTY_ID_DOCS_URL = "https://developers.google.com/analytics/devguides/reporting/data/v1/property-id#what_is_my_property_id"
MESSAGE = "Incorrect Property ID: {property_id}. Access was denied to the property ID entered. Check your access to the Property ID or use Google Analytics {property_id_docs_url} to find your Property ID."
def get_google_analytics_data_api_metadata_error_mapping(property_id) -> Mapping[Union[int, str, Type[Exception]], ErrorResolution]:
"""
Adding friendly messages to bad request and forbidden responses that includes the property id and the documentation guidance.
"""
return DEFAULT_ERROR_MAPPING | {
403: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message=MESSAGE.format(property_id=property_id, property_id_docs_url=PROPERTY_ID_DOCS_URL),
),
400: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message=MESSAGE.format(property_id=property_id, property_id_docs_url=PROPERTY_ID_DOCS_URL),
),
}

View File

@@ -4,15 +4,52 @@
import sys
import traceback
from datetime import datetime
from typing import List
from airbyte_cdk.entrypoint import launch
from orjson import orjson
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch, logger
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteMessageSerializer, AirbyteTraceMessage, TraceType, Type
from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi
from source_google_analytics_data_api.config_migrations import MigrateCustomReports, MigrateCustomReportsCohortSpec, MigratePropertyID
def run():
source = SourceGoogleAnalyticsDataApi()
MigratePropertyID.migrate(sys.argv[1:], source)
MigrateCustomReports.migrate(sys.argv[1:], source)
MigrateCustomReportsCohortSpec.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceGoogleAnalyticsDataApi(
SourceGoogleAnalyticsDataApi.read_catalog(catalog_path) if catalog_path else None,
SourceGoogleAnalyticsDataApi.read_config(config_path) if config_path else None,
SourceGoogleAnalyticsDataApi.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
)
)
).decode()
)
return None
def run() -> None:
init_uncaught_exception_handler(logger)
_args = sys.argv[1:]
source = _get_source(_args)
if source:
launch(source, _args)

View File

@@ -1,681 +1,14 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import copy
import datetime
import json
import logging
import pkgutil
import re
import uuid
from abc import ABC
from datetime import timedelta
from http import HTTPStatus
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Type, Union
from typing import Any, Mapping, Optional
import dpath
import jsonschema
import pendulum
import requests
from requests import HTTPError
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, HttpStatusErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution
from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
from airbyte_cdk.utils import AirbyteTracedException
from source_google_analytics_data_api import utils
from source_google_analytics_data_api.google_analytics_data_api_base_error_mapping import get_google_analytics_data_api_base_error_mapping
from source_google_analytics_data_api.google_analytics_data_api_metadata_error_mapping import (
get_google_analytics_data_api_metadata_error_mapping,
)
from source_google_analytics_data_api.utils import DATE_FORMAT, WRONG_DIMENSIONS, WRONG_JSON_SYNTAX, WRONG_METRICS
from .api_quota import GoogleAnalyticsApiQuota
from .utils import (
authenticator_class_map,
check_invalid_property_error,
check_no_property_error,
get_dimensions_type,
get_metrics_type,
get_source_defined_primary_key,
metrics_type_to_python,
serialize_to_date_string,
transform_json,
)
# set the quota handler globally since limitations are the same for all streams
# the initial values should be saved once and tracked for each stream, inclusively.
GoogleAnalyticsQuotaHandler: GoogleAnalyticsApiQuota = GoogleAnalyticsApiQuota()
DEFAULT_LOOKBACK_WINDOW = 2
class ConfigurationError(Exception):
pass
class MetadataDescriptor:
def __init__(self):
self._metadata = None
def __get__(self, instance, owner):
if not self._metadata:
stream = GoogleAnalyticsDataApiMetadataStream(config=instance.config, authenticator=instance.config["authenticator"])
metadata = None
try:
metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
except HTTPError as e:
if e.response.status_code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]:
internal_message = "Unauthorized error reached."
message = "Can not get metadata with unauthorized credentials. Try to re-authenticate in source settings."
unauthorized_error = AirbyteTracedException(
message=message, internal_message=internal_message, failure_type=FailureType.config_error
)
raise unauthorized_error
if not metadata:
raise Exception("failed to get metadata, over quota, try later")
self._metadata = {
"dimensions": {m.get("apiName"): m for m in metadata.get("dimensions", [{}])},
"metrics": {m.get("apiName"): m for m in metadata.get("metrics", [{}])},
}
return self._metadata
class GoogleAnalyticsDataApiBackoffStrategy(BackoffStrategy):
def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
) -> Optional[float]:
if isinstance(response_or_exception, requests.Response):
# handle the error with prepared GoogleAnalyticsQuotaHandler backoff value
if response_or_exception.status_code == requests.codes.too_many_requests:
return GoogleAnalyticsQuotaHandler.backoff_time
return None
class GoogleAnalyticsDatApiErrorHandler(HttpStatusErrorHandler):
QUOTA_RECOVERY_TIME = 3600
def __init__(
self,
logger: logging.Logger,
error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None,
) -> None:
super().__init__(
logger=logger,
error_mapping=error_mapping,
max_retries=5,
max_time=timedelta(seconds=GoogleAnalyticsDatApiErrorHandler.QUOTA_RECOVERY_TIME),
)
@GoogleAnalyticsQuotaHandler.handle_quota()
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if not isinstance(response_or_exception, Exception) and response_or_exception.status_code == requests.codes.too_many_requests:
return ErrorResolution(
response_action=GoogleAnalyticsQuotaHandler.response_action,
failure_type=FailureType.transient_error,
error_message=GoogleAnalyticsQuotaHandler.error_message,
)
return super().interpret_response(response_or_exception)
class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC):
url_base = "https://analyticsdata.googleapis.com/v1beta/"
http_method = "POST"
def __init__(self, *, config: Mapping[str, Any], page_size: int = 100_000, **kwargs):
self._config = config
self._source_defined_primary_key = get_source_defined_primary_key(self.name)
# default value is 100 000 due to determination of maximum limit value in official documentation
# https://developers.google.com/analytics/devguides/reporting/data/v1/basics#pagination
self._page_size = page_size
super().__init__(**kwargs)
@property
def config(self):
return self._config
@property
def page_size(self):
return self._page_size
@page_size.setter
def page_size(self, value: int):
self._page_size = value
# handle the quota errors with prepared values for:
# `should_retry`, `backoff_time`, `raise_on_http_errors`, `stop_iter` based on quota scenario.
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
return GoogleAnalyticsDataApiBackoffStrategy()
def get_error_handler(self) -> Optional[ErrorHandler]:
return GoogleAnalyticsDatApiErrorHandler(logger=self.logger, error_mapping=self.get_error_mapping())
def get_error_mapping(self) -> Mapping[Union[int, str, Type[Exception]], ErrorResolution]:
return DEFAULT_ERROR_MAPPING
class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream):
"""
https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/properties/runReport
"""
_record_date_format = "%Y%m%d"
offset = 0
metadata = MetadataDescriptor()
def get_error_mapping(self) -> Mapping[Union[int, str, Type[Exception]], ErrorResolution]:
return get_google_analytics_data_api_base_error_mapping(self.name)
@property
def cursor_field(self) -> Optional[str]:
date_fields = ["date", "yearWeek", "yearMonth", "year"]
for field in date_fields:
if field in self.config.get("dimensions", []):
return field
return []
@property
def primary_key(self):
pk = ["property_id"] + self.config.get("dimensions", [])
if "cohort_spec" not in self.config and "date" not in pk:
pk.append("startDate")
pk.append("endDate")
return pk
@staticmethod
def add_dimensions(dimensions, row) -> dict:
return dict(zip(dimensions, [v["value"] for v in row["dimensionValues"]]))
@staticmethod
def add_metrics(metrics, metric_types, row) -> dict:
def _metric_type_to_python(metric_data: Tuple[str, str]) -> Any:
metric_name, metric_value = metric_data
python_type = metrics_type_to_python(metric_types[metric_name])
# Google Analytics sometimes returns float for integer metrics.
# So this is a workaround for this issue: https://github.com/airbytehq/oncall/issues/4130
if python_type == int:
return metric_name, round(float(metric_value))
return metric_name, python_type(metric_value)
return dict(map(_metric_type_to_python, zip(metrics, [v["value"] for v in row["metricValues"]])))
def get_json_schema(self) -> Mapping[str, Any]:
"""
Override get_json_schema CDK method to retrieve the schema information for GoogleAnalyticsV4 Object dynamically.
"""
schema: Dict[str, Any] = {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": True,
"properties": {
"property_id": {"type": ["string"]},
},
}
schema["properties"].update(
{
d: {
"type": get_dimensions_type(d),
"description": self.metadata["dimensions"].get(d, {}).get("description", d),
}
for d in self.config["dimensions"]
}
)
# skipping startDate and endDate fields for cohort stream, because it doesn't support startDate and endDate fields
if "cohort_spec" not in self.config and "date" not in self.config["dimensions"]:
schema["properties"].update(
{
"startDate": {"type": ["null", "string"], "format": "date"},
"endDate": {"type": ["null", "string"], "format": "date"},
}
)
schema["properties"].update(
{
m: {
"type": ["null", get_metrics_type(self.metadata["metrics"].get(m, {}).get("type"))],
"description": self.metadata["metrics"].get(m, {}).get("description", m),
}
for m in self.config["metrics"]
}
)
# change the type of `conversions:*` metrics from int to float: https://github.com/airbytehq/oncall/issues/4130
if self.config.get("convert_conversions_event", False):
for schema_field in schema["properties"]:
if schema_field.startswith("conversions:"):
schema["properties"][schema_field]["type"] = ["null", "float"]
return schema
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
r = response.json()
if "rowCount" in r:
total_rows = r["rowCount"]
if self.offset == 0:
self.offset = self.page_size
else:
self.offset += self.page_size
if total_rows <= self.offset:
self.offset = 0
return
return {"offset": self.offset}
def path(
self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"properties/{self.config['property_id']}:runReport"
def parse_response(
self,
response: requests.Response,
*,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
r = response.json()
dimensions = [h.get("name") for h in r.get("dimensionHeaders", [{}])]
metrics = [h.get("name") for h in r.get("metricHeaders", [{}])]
metrics_type_map = {h.get("name"): h.get("type") for h in r.get("metricHeaders", [{}]) if "name" in h}
# change the type of `conversions:*` metrics from int to float: https://github.com/airbytehq/oncall/issues/4130
if self.config.get("convert_conversions_event", False):
for schema_field in metrics_type_map:
if schema_field.startswith("conversions:"):
metrics_type_map[schema_field] = "TYPE_FLOAT"
for row in r.get("rows", []):
record = {
"property_id": self.config["property_id"],
**self.add_dimensions(dimensions, row),
**self.add_metrics(metrics, metrics_type_map, row),
}
# https://github.com/airbytehq/airbyte/pull/26283
# We pass the uuid field for synchronizations which still have the old
# configured_catalog with the old primary key. We need it to avoid of removal of rows
# in the deduplication process. As soon as the customer press "refresh source schema"
# this part is no longer needed.
if self._source_defined_primary_key == [["uuid"]]:
record["uuid"] = str(uuid.uuid4())
if "cohort_spec" not in self.config and "date" not in record:
record["startDate"] = stream_slice["startDate"]
record["endDate"] = stream_slice["endDate"]
yield record
def get_updated_state(
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
if not self.cursor_field:
# Some implementations of the GoogleAnalyticsDataApiBaseStream might not have a cursor because it's
# based on the `dimensions` config setting. This results in a full_refresh only stream that implements
# get_updated_state(), but does not define a cursor. For this scenario, there is no state value to extract
return {}
updated_state = (
utils.string_to_date(latest_record[self.cursor_field], self._record_date_format)
if self.cursor_field == "date"
else latest_record[self.cursor_field]
)
stream_state_value = current_stream_state.get(self.cursor_field)
if stream_state_value:
stream_state_value = (
utils.string_to_date(stream_state_value, self._record_date_format, old_format=DATE_FORMAT)
if self.cursor_field == "date"
else stream_state_value
)
updated_state = max(updated_state, stream_state_value)
current_stream_state[self.cursor_field] = (
updated_state.strftime(self._record_date_format) if self.cursor_field == "date" else updated_state
)
return current_stream_state
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
if stream_slice and "startDate" in stream_slice and "endDate" in stream_slice:
date_range = {"startDate": stream_slice["startDate"], "endDate": stream_slice["endDate"]}
else:
date_range = stream_slice
payload = {
"metrics": [{"name": m} for m in self.config["metrics"]],
"dimensions": [{"name": d} for d in self.config["dimensions"]],
"dateRanges": [date_range],
"returnPropertyQuota": True,
"offset": str(0),
"limit": str(self.page_size),
"keepEmptyRows": self.config.get("keep_empty_rows", False),
}
dimension_filter = self.config.get("dimensionFilter")
if dimension_filter:
payload.update({"dimensionFilter": dimension_filter})
metrics_filter = self.config.get("metricsFilter")
if metrics_filter:
payload.update({"metricsFilter": metrics_filter})
if next_page_token and next_page_token.get("offset") is not None:
payload.update({"offset": str(next_page_token["offset"])})
return payload
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
today: datetime.date = datetime.date.today()
start_date = None
if self.cursor_field:
start_date = stream_state and stream_state.get(self.cursor_field)
if start_date:
start_date = (
serialize_to_date_string(start_date, DATE_FORMAT, self.cursor_field) if not self.cursor_field == "date" else start_date
)
start_date = utils.string_to_date(start_date, self._record_date_format, old_format=DATE_FORMAT)
start_date = start_date - datetime.timedelta(days=self.config.get("lookback_window", DEFAULT_LOOKBACK_WINDOW))
start_date = max(start_date, self.config["date_ranges_start_date"])
else:
start_date = self.config["date_ranges_start_date"]
end_date = min(self.config.get("date_ranges_end_date", today), today)
while start_date <= end_date:
# stop producing slices if 429 + specific scenario is hit
# see GoogleAnalyticsQuotaHandler for more info.
if GoogleAnalyticsQuotaHandler.stop_iter:
return []
else:
yield {
"startDate": utils.date_to_string(start_date),
"endDate": utils.date_to_string(min(start_date + datetime.timedelta(days=self.config["window_in_days"] - 1), end_date)),
}
start_date += datetime.timedelta(days=self.config["window_in_days"])
class PivotReport(GoogleAnalyticsDataApiBaseStream):
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
payload = super().request_body_json(stream_state, stream_slice, next_page_token)
# remove offset and limit fields according to their absence in
# https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/properties/runPivotReport
payload.pop("offset", None)
payload.pop("limit", None)
payload["pivots"] = self.config["pivots"]
return payload
def path(
self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"properties/{self.config['property_id']}:runPivotReport"
class CohortReportMixin:
cursor_field = []
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
yield from [{}]
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
# https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/CohortSpec#Cohort.FIELDS.date_range
# In a cohort request, this dateRange is required and the dateRanges in the RunReportRequest or RunPivotReportRequest
# must be unspecified.
payload = super().request_body_json(stream_state, stream_slice, next_page_token)
payload.pop("dateRanges")
payload["cohortSpec"] = self.config["cohort_spec"]
return payload
class GoogleAnalyticsDataApiMetadataStream(GoogleAnalyticsDataApiAbstractStream):
"""
https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/properties/getMetadata
"""
primary_key = None
http_method = "GET"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
def path(
self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"properties/{self.config['property_id']}/metadata"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield response.json()
def get_error_mapping(self):
return get_google_analytics_data_api_metadata_error_mapping(self.config.get("property_id"))
class SourceGoogleAnalyticsDataApi(AbstractSource):
@property
def default_date_ranges_start_date(self) -> str:
# set default date ranges start date to 2 years ago
return pendulum.now(tz="UTC").subtract(years=2).format("YYYY-MM-DD")
@property
def default_date_ranges_end_date(self) -> str:
# set default date ranges end date to today
return pendulum.now(tz="UTC").format("YYYY-MM-DD")
@property
def raise_exception_on_missing_stream(self) -> bool:
# reference issue: https://github.com/airbytehq/airbyte-internal-issues/issues/8315
# This has been added, because there is a risk of removing the `Custom Stream` from the `input configuration`,
# which brings the error about `missing stream` present in the CATALOG but not in the `input configuration`.
return False
def _validate_and_transform_date(self, date: str, default_date: str) -> datetime.date:
date = default_date if not date else date
try:
date = utils.string_to_date(date)
except ValueError as e:
raise ConfigurationError(str(e))
return date
def _validate_custom_reports(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
if "custom_reports_array" in config:
if isinstance(config["custom_reports_array"], str):
try:
config["custom_reports_array"] = json.loads(config["custom_reports_array"])
if not isinstance(config["custom_reports_array"], list):
raise ValueError
except ValueError:
raise ConfigurationError(WRONG_JSON_SYNTAX)
else:
config["custom_reports_array"] = []
return config
def _validate_and_transform(self, config: Mapping[str, Any], report_names: Set[str]):
config = self._validate_custom_reports(config)
schema = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/custom_reports_schema.json"))
try:
jsonschema.validate(instance=config["custom_reports_array"], schema=schema)
except jsonschema.ValidationError as e:
if message := check_no_property_error(e):
raise ConfigurationError(message)
if message := check_invalid_property_error(e):
report_name = dpath.get(config["custom_reports_array"], str(e.absolute_path[0])).get("name")
raise ConfigurationError(message.format(fields=e.message, report_name=report_name))
existing_names = {r["name"] for r in config["custom_reports_array"]} & report_names
if existing_names:
existing_names = ", ".join(existing_names)
raise ConfigurationError(f"Custom reports: {existing_names} already exist as a default report(s).")
if "credentials_json" in config["credentials"]:
try:
config["credentials"]["credentials_json"] = json.loads(config["credentials"]["credentials_json"])
except ValueError:
raise ConfigurationError("credentials.credentials_json is not valid JSON")
config["date_ranges_start_date"] = self._validate_and_transform_date(
config.get("date_ranges_start_date"), self.default_date_ranges_start_date
)
config["date_ranges_end_date"] = self._validate_and_transform_date(
config.get("date_ranges_end_date"), self.default_date_ranges_end_date
)
if config["date_ranges_start_date"] > config["date_ranges_end_date"]:
raise ConfigurationError(
"End date '"
+ config["date_ranges_end_date"].strftime("%Y-%m-%d")
+ "' can not be before start date '"
+ config["date_ranges_start_date"].strftime("%Y-%m-%d")
+ "'"
)
if not config.get("window_in_days"):
source_spec = self.spec(logging.getLogger("airbyte"))
config["window_in_days"] = source_spec.connectionSpecification["properties"]["window_in_days"]["default"]
return config
def get_authenticator(self, config: Mapping[str, Any]):
credentials = config["credentials"]
authenticator_class, get_credentials = authenticator_class_map[credentials["auth_type"]]
return authenticator_class(**get_credentials(credentials))
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
for property_id in config["property_ids"]:
reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
try:
config = self._validate_and_transform(config, report_names={r["name"] for r in reports})
except ConfigurationError as e:
return False, str(e)
config["authenticator"] = self.get_authenticator(config)
_config = config.copy()
_config["property_id"] = property_id
metadata = None
try:
# explicitly setting small page size for the check operation not to cause OOM issues
stream = GoogleAnalyticsDataApiMetadataStream(config=_config, authenticator=_config["authenticator"])
metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
except (MessageRepresentationAirbyteTracedErrors, BaseBackoffException) as ex:
if hasattr(ex, "failure_type") and ex.failure_type == FailureType.config_error:
# bad request and forbidden are set in mapper as config errors
raise ex
logger.error(f"Check failed", exc_info=ex)
if not metadata:
return False, "Failed to get metadata, over quota, try later"
dimensions = {d["apiName"] for d in metadata["dimensions"]}
metrics = {d["apiName"] for d in metadata["metrics"]}
for report in _config["custom_reports_array"]:
# Check if custom report dimensions supported. Compare them with dimensions provided by GA API
invalid_dimensions = set(report["dimensions"]) - dimensions
if invalid_dimensions:
invalid_dimensions = ", ".join(invalid_dimensions)
return False, WRONG_DIMENSIONS.format(fields=invalid_dimensions, report_name=report["name"])
# Check if custom report metrics supported. Compare them with metrics provided by GA API
invalid_metrics = set(report["metrics"]) - metrics
if invalid_metrics:
invalid_metrics = ", ".join(invalid_metrics)
return False, WRONG_METRICS.format(fields=invalid_metrics, report_name=report["name"])
report_stream = self.instantiate_report_class(report, False, _config, page_size=100)
# check if custom_report dimensions + metrics can be combined and report generated
try:
stream_slice = next(report_stream.stream_slices(sync_mode=SyncMode.full_refresh))
next(report_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), None)
except MessageRepresentationAirbyteTracedErrors as e:
return False, f"{e.message} {self._extract_internal_message_error_response(e.internal_message)}"
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
config = self._validate_and_transform(config, report_names={r["name"] for r in reports})
config["authenticator"] = self.get_authenticator(config)
return [stream for report in reports + config["custom_reports_array"] for stream in self.instantiate_report_streams(report, config)]
def instantiate_report_streams(
self, report: dict, config: Mapping[str, Any], **extra_kwargs
) -> Iterable[GoogleAnalyticsDataApiBaseStream]:
add_name_suffix = False
for property_id in config["property_ids"]:
yield self.instantiate_report_class(
report=report, add_name_suffix=add_name_suffix, config={**config, "property_id": property_id}
)
# Append property ID to stream name only for the second and subsequent properties.
# This will make a release non-breaking for users with a single property.
# This is a temporary solution until https://github.com/airbytehq/airbyte/issues/30926 is implemented.
add_name_suffix = True
@staticmethod
def instantiate_report_class(
report: dict, add_name_suffix: bool, config: Mapping[str, Any], **extra_kwargs
) -> GoogleAnalyticsDataApiBaseStream:
cohort_spec = report.get("cohortSpec", {})
pivots = report.get("pivots")
stream_config = {
**config,
"metrics": report["metrics"],
"dimensions": report["dimensions"],
"dimensionFilter": transform_json(report.get("dimensionFilter", {})),
"metricsFilter": transform_json(report.get("metricsFilter", {})),
}
report_class_tuple = (GoogleAnalyticsDataApiBaseStream,)
if pivots:
stream_config["pivots"] = pivots
report_class_tuple = (PivotReport,)
if cohort_spec.pop("enabled", "") == "true":
stream_config["cohort_spec"] = cohort_spec
report_class_tuple = (CohortReportMixin, *report_class_tuple)
name = report["name"]
if add_name_suffix:
name = f"{name}Property{config['property_id']}"
return type(name, report_class_tuple, {})(config=stream_config, authenticator=config["authenticator"], **extra_kwargs)
@staticmethod
def _extract_internal_message_error_response(message):
pattern = r"error message '(.*?)'"
match = re.search(pattern, message)
if match:
error_message = match.group(1)
return error_message
return ""
class SourceGoogleAnalyticsDataApi(YamlDeclarativeSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})

View File

@@ -1,249 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import argparse
import calendar
import datetime
import json
import sys
from typing import Dict
import jsonschema
import pandas as pd
from airbyte_cdk.sources.streams.http import requests_native_auth as auth
from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator
DATE_FORMAT = "%Y-%m-%d"
metrics_data_native_types_map: Dict = {
"METRIC_TYPE_UNSPECIFIED": str,
"TYPE_INTEGER": int,
"TYPE_FLOAT": float,
"TYPE_SECONDS": float,
"TYPE_MILLISECONDS": float,
"TYPE_MINUTES": float,
"TYPE_HOURS": float,
"TYPE_STANDARD": float,
"TYPE_CURRENCY": float,
"TYPE_FEET": float,
"TYPE_MILES": float,
"TYPE_METERS": float,
"TYPE_KILOMETERS": float,
}
metrics_data_types_map: Dict = {
"METRIC_TYPE_UNSPECIFIED": "string",
"TYPE_INTEGER": "integer",
"TYPE_FLOAT": "number",
"TYPE_SECONDS": "number",
"TYPE_MILLISECONDS": "number",
"TYPE_MINUTES": "number",
"TYPE_HOURS": "number",
"TYPE_STANDARD": "number",
"TYPE_CURRENCY": "number",
"TYPE_FEET": "number",
"TYPE_MILES": "number",
"TYPE_METERS": "number",
"TYPE_KILOMETERS": "number",
}
authenticator_class_map: Dict = {
"Service": (GoogleServiceKeyAuthenticator, lambda credentials: {"credentials": credentials["credentials_json"]}),
"Client": (
auth.Oauth2Authenticator,
lambda credentials: {
"token_refresh_endpoint": "https://oauth2.googleapis.com/token",
"scopes": ["https://www.googleapis.com/auth/analytics.readonly"],
"client_secret": credentials["client_secret"],
"client_id": credentials["client_id"],
"refresh_token": credentials["refresh_token"],
},
),
}
WRONG_JSON_SYNTAX = "The custom report entered is not in a JSON array format. Check the entered format follows the syntax in our docs: https://docs.airbyte.com/integrations/sources/google-analytics-data-api/"
NO_NAME = "The custom report entered does not contain a name, which is required. Check the entered format follows the syntax in our docs: https://docs.airbyte.com/integrations/sources/google-analytics-data-api/"
NO_DIMENSIONS = "The custom report entered does not contain dimensions, which is required. Check the entered format follows the syntax in our docs (https://docs.airbyte.com/integrations/sources/google-analytics-data-api/) and validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/)."
NO_METRICS = "The custom report entered does not contain metrics, which is required. Check the entered format follows the syntax in our docs (https://docs.airbyte.com/integrations/sources/google-analytics-data-api/) and validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/)."
WRONG_DIMENSIONS = "The custom report {report_name} entered contains invalid dimensions: {fields}. Validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/)."
WRONG_METRICS = "The custom report {report_name} entered contains invalid metrics: {fields}. Validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/)."
WRONG_PIVOTS = "The custom report {report_name} entered contains invalid pivots: {fields}. Ensure the pivot follow the syntax described in the docs (https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/Pivot)."
API_LIMIT_PER_HOUR = "Your API key has reached its limit for the hour. Wait until the quota refreshes in an hour to retry."
WRONG_CUSTOM_REPORT_CONFIG = "Please check configuration for custom report {report}."
def datetime_to_secs(dt: datetime.datetime) -> int:
return calendar.timegm(dt.utctimetuple())
def string_to_date(d: str, f: str = DATE_FORMAT, old_format=None) -> datetime.date:
# To convert the old STATE date format "YYYY-MM-DD" to the new format "YYYYMMDD" we need this `old_format` additional param.
# As soon as all current cloud sync will be converted to the new format we can remove this double format support.
if old_format:
try:
return datetime.datetime.strptime(d, old_format).date()
except ValueError:
pass
return datetime.datetime.strptime(d, f).date()
def date_to_string(d: datetime.date, f: str = DATE_FORMAT) -> str:
return d.strftime(f)
def get_metrics_type(t: str) -> str:
return metrics_data_types_map.get(t, "number")
def metrics_type_to_python(t: str) -> type:
return metrics_data_native_types_map.get(t, str)
def get_dimensions_type(d: str) -> str:
return "string"
def check_no_property_error(exc: jsonschema.ValidationError) -> str:
mapper = {
"'name' is a required property": NO_NAME,
"'dimensions' is a required property": NO_DIMENSIONS,
"'metrics' is a required property": NO_METRICS,
}
return mapper.get(exc.message)
def check_invalid_property_error(exc: jsonschema.ValidationError) -> str:
mapper = {"dimensions": WRONG_DIMENSIONS, "metrics": WRONG_METRICS, "pivots": WRONG_PIVOTS}
for property in mapper:
if property in exc.schema_path:
return mapper[property]
def get_source_defined_primary_key(stream):
"""
https://github.com/airbytehq/airbyte/pull/26283
It's not a very elegant way to get source_defined_primary_key inside the stream.
It's used only for a smooth transition to the new primary key.
As soon as the transition will complete we can remove this function.
"""
if len(sys.argv) > 1 and "read" == sys.argv[1]:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
read_subparser = subparsers.add_parser("read")
read_subparser.add_argument("--catalog", type=str, required=True)
args, unknown = parser.parse_known_args()
catalog = json.loads(open(args.catalog).read())
res = {s["stream"]["name"]: s["stream"].get("source_defined_primary_key") for s in catalog["streams"]}
return res.get(stream)
def transform_string_filter(filter):
string_filter = {"value": filter.get("value")}
if "matchType" in filter:
string_filter["matchType"] = filter.get("matchType")[0]
if "caseSensitive" in filter:
string_filter["caseSensitive"] = filter.get("caseSensitive")
return {"stringFilter": string_filter}
def transform_in_list_filter(filter):
in_list_filter = {"values": filter.get("values")}
if "caseSensitive" in filter:
in_list_filter["caseSensitive"] = filter.get("caseSensitive")
return {"inListFilter": in_list_filter}
def transform_numeric_filter(filter):
numeric_filter = {
"value": {filter.get("value").get("value_type"): filter.get("value").get("value")},
}
if "operation" in filter:
numeric_filter["operation"] = filter.get("operation")[0]
return {"numericFilter": numeric_filter}
def transform_between_filter(filter):
from_value = filter.get("fromValue")
to_value = filter.get("toValue")
from_value_type = from_value.get("value_type")
to_value_type = to_value.get("value_type")
if from_value_type == "doubleValue" and isinstance(from_value.get("value"), str):
from_value["value"] = float(from_value.get("value"))
if to_value_type == "doubleValue" and isinstance(to_value.get("value"), str):
to_value["value"] = float(to_value.get("value"))
return {
"betweenFilter": {
"fromValue": {from_value_type: from_value.get("value")},
"toValue": {to_value_type: to_value.get("value")},
}
}
def transform_expression(expression):
transformed_expression = {"fieldName": expression.get("field_name")}
filter = expression.get("filter")
filter_name = filter.get("filter_name")
if filter_name == "stringFilter":
transformed_expression.update(transform_string_filter(filter))
elif filter_name == "inListFilter":
transformed_expression.update(transform_in_list_filter(filter))
elif filter_name == "numericFilter":
transformed_expression.update(transform_numeric_filter(filter))
elif filter_name == "betweenFilter":
transformed_expression.update(transform_between_filter(filter))
return {"filter": transformed_expression}
def transform_json(original_json):
transformed_json = {}
filter_type = original_json.get("filter_type")
if filter_type in ["andGroup", "orGroup"]:
expressions = original_json.get("expressions", [])
transformed_expressions = [transform_expression(exp) for exp in expressions]
transformed_json = {filter_type: {"expressions": transformed_expressions}} if transformed_expressions else {}
elif filter_type == "notExpression":
expression = original_json.get("expression")
transformed_expression = transform_expression(expression)
transformed_json = {filter_type: transformed_expression}
elif filter_type == "filter":
transformed_json = transform_expression(original_json)
return transformed_json
def serialize_to_date_string(date: str, date_format: str, date_type: str) -> str:
"""
Serialize a date string to a different date format based on the date_type.
Parameters:
- date (str): The input date string.
- date_format (str): The desired output format for the date string.
- date_type (str): The type of the date string ('yearWeek', 'yearMonth', or 'year').
Returns:
str: The date string formatted according to date_format.
Examples:
'202245' -> '2022-11-07'
'202210' -> '2022-10-01'
'2022' -> '2022-01-01'
"""
if date_type == "yearWeek":
return pd.to_datetime(f"{date}1", format="%Y%W%w").strftime(date_format)
elif date_type == "yearMonth":
year = int(date[:-2])
month = int(date[-2:])
return datetime.datetime(year, month, 1).strftime(date_format)
return datetime.datetime(int(date), 1, 1).strftime(date_format)

View File

@@ -1,3 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

View File

@@ -1,100 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
import json
from copy import deepcopy
import pytest
# json credentials with fake private key
json_credentials = """
{
"type": "service_account",
"project_id": "unittest-project-id",
"private_key_id": "9qf98e52oda52g5ne23al6evnf13649c2u077162c",
"private_key": "-----BEGIN PRIVATE KEY-----\\nMIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEA3slcXL+dA36ESmOi\\n1xBhZmp5Hn0WkaHDtW4naba3plva0ibloBNWhFhjQOh7Ff01PVjhT4D5jgqXBIgc\\nz9Gv3QIDAQABAkEArlhYPoD5SB2/O1PjwHgiMPrL1C9B9S/pr1cH4vPJnpY3VKE3\\n5hvdil14YwRrcbmIxMkK2iRLi9lM4mJmdWPy4QIhAPsRFXZSGx0TZsDxD9V0ZJmZ\\n0AuDCj/NF1xB5KPLmp7pAiEA4yoFox6w7ql/a1pUVaLt0NJkDfE+22pxYGNQaiXU\\nuNUCIQCsFLaIJZiN4jlgbxlyLVeya9lLuqIwvqqPQl6q4ad12QIgS9gG48xmdHig\\n8z3IdIMedZ8ZCtKmEun6Cp1+BsK0wDUCIF0nHfSuU+eTQ2qAON2SHIrJf8UeFO7N\\nzdTN1IwwQqjI\\n-----END PRIVATE KEY-----\\n",
"client_email": "google-analytics-access@unittest-project-id.iam.gserviceaccount.com",
"client_id": "213243192021686092537",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/google-analytics-access%40unittest-project-id.iam.gserviceaccount.com"
}
"""
@pytest.fixture
def one_year_ago():
return datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d")
@pytest.fixture
def config(one_year_ago):
return {
"property_id": "108176369",
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"date_ranges_start_date": one_year_ago,
"dimensions": ["date", "deviceCategory", "operatingSystem", "browser"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate",
],
"keep_empty_rows": True,
"custom_reports": json.dumps(
[
{
"name": "report1",
"dimensions": ["date", "browser"],
"metrics": ["totalUsers", "sessions", "screenPageViews"],
}
]
),
}
@pytest.fixture
def config_without_date_range():
return {
"property_id": "108176369",
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"dimensions": ["deviceCategory", "operatingSystem", "browser"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate",
],
"custom_reports": [],
}
@pytest.fixture
def patch_base_class(one_year_ago, config_without_date_range):
return {"config": config_without_date_range}
@pytest.fixture
def config_gen(config):
def inner(**kwargs):
new_config = deepcopy(config)
# WARNING, no support deep dictionaries
new_config.update(kwargs)
return {k: v for k, v in new_config.items() if v is not ...}
return inner

View File

@@ -1,158 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import pytest
import requests
from source_google_analytics_data_api.api_quota import GoogleAnalyticsApiQuota
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
TEST_QUOTA_INSTANCE: GoogleAnalyticsApiQuota = GoogleAnalyticsApiQuota()
@pytest.fixture(name="expected_quota_list")
def expected_quota_list():
"""The Quota were currently handle"""
return ["concurrentRequests", "tokensPerProjectPerHour", "potentiallyThresholdedRequestsPerHour"]
def test_check_initial_quota_is_empty():
"""
Check the initial quota property is empty (== None), but ready to be fullfield.
"""
assert not TEST_QUOTA_INSTANCE.initial_quota
@pytest.mark.parametrize(
("response_quota", "partial_quota", "response_action_exp", "backoff_time_exp", "stop_iter_exp"),
[
# Full Quota
(
{
"propertyQuota": {
"concurrentRequests": {"consumed": 0, "remaining": 10},
"tokensPerProjectPerHour": {"consumed": 1, "remaining": 1735},
"potentiallyThresholdedRequestsPerHour": {"consumed": 1, "remaining": 26},
}
},
False, # partial_quota
ResponseAction.RETRY,
None, # backoff_time_exp
False, # stop_iter_exp
),
# Partial Quota
(
{
"propertyQuota": {
"concurrentRequests": {"consumed": 0, "remaining": 10},
"tokensPerProjectPerHour": {"consumed": 5, "remaining": 955},
"potentiallyThresholdedRequestsPerHour": {"consumed": 3, "remaining": 26},
}
},
True, # partial_quota
ResponseAction.RETRY,
None, # backoff_time_exp
False, # stop_iter_exp
),
# Running out `tokensPerProjectPerHour`
(
{
"propertyQuota": {
"concurrentRequests": {"consumed": 2, "remaining": 8},
"tokensPerProjectPerHour": {
"consumed": 5,
# ~9% from original quota is left
"remaining": 172,
},
"potentiallyThresholdedRequestsPerHour": {"consumed": 3, "remaining": 26},
}
},
True, # partial_quota
ResponseAction.RETRY,
1800, # backoff_time_exp
False, # stop_iter_exp
),
# Running out `concurrentRequests`
(
{
"propertyQuota": {
"concurrentRequests": {
"consumed": 9,
# 10% from original quota is left
"remaining": 1,
},
"tokensPerProjectPerHour": {"consumed": 5, "remaining": 935},
"potentiallyThresholdedRequestsPerHour": {"consumed": 1, "remaining": 26},
}
},
True, # partial_quota
ResponseAction.RETRY,
30, # backoff_time_exp
False, # stop_iter_exp
),
# Running out `potentiallyThresholdedRequestsPerHour`
(
{
"propertyQuota": {
"concurrentRequests": {"consumed": 1, "remaining": 9},
"tokensPerProjectPerHour": {"consumed": 5, "remaining": 935},
"potentiallyThresholdedRequestsPerHour": {
# 7% from original quota is left
"consumed": 26,
"remaining": 2,
},
}
},
True, # partial_quota
ResponseAction.RETRY,
1800, # backoff_time_exp
False, # stop_iter_exp
),
],
ids=[
"Full",
"Partial",
"Running out tokensPerProjectPerHour",
"Running out concurrentRequests",
"Running out potentiallyThresholdedRequestsPerHour",
],
)
def test_check_full_quota(
requests_mock,
expected_quota_list,
response_quota,
partial_quota,
response_action_exp,
backoff_time_exp,
stop_iter_exp,
):
"""
Check the quota and prepare the initial values for subsequent comparison with subsequent response calls.
The default values for the scenario are expected when the quota is full.
"""
# Prepare instance
url = "https://analyticsdata.googleapis.com/v1beta/"
payload = response_quota
requests_mock.post(url, json=payload)
response = requests.post(url)
# process and prepare the scenario
TEST_QUOTA_INSTANCE._check_quota(response)
# TEST BLOCK
# Check the INITIAL QUOTA is saved properly
assert [quota in expected_quota_list for quota in TEST_QUOTA_INSTANCE.initial_quota.keys()]
# Check the CURRENT QUOTA is different from Initial
if partial_quota:
current_quota = TEST_QUOTA_INSTANCE._get_known_quota_from_response(response.json().get("propertyQuota"))
assert not current_quota == TEST_QUOTA_INSTANCE.initial_quota
# Check the scenario is applied based on Quota Values
assert TEST_QUOTA_INSTANCE.response_action is response_action_exp
# backoff_time
assert TEST_QUOTA_INSTANCE.backoff_time == backoff_time_exp
# stop_iter
assert TEST_QUOTA_INSTANCE.stop_iter is stop_iter_exp

View File

@@ -1,30 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import requests
from freezegun import freeze_time
from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator
@freeze_time("2023-01-01 00:00:00")
def test_token_rotation(requests_mock):
credentials = {
"client_email": "client_email",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEA3slcXL+dA36ESmOi\n1xBhZmp5Hn0WkaHDtW4naba3plva0ibloBNWhFhjQOh7Ff01PVjhT4D5jgqXBIgc\nz9Gv3QIDAQABAkEArlhYPoD5SB2/O1PjwHgiMPrL1C9B9S/pr1cH4vPJnpY3VKE3\n5hvdil14YwRrcbmIxMkK2iRLi9lM4mJmdWPy4QIhAPsRFXZSGx0TZsDxD9V0ZJmZ\n0AuDCj/NF1xB5KPLmp7pAiEA4yoFox6w7ql/a1pUVaLt0NJkDfE+22pxYGNQaiXU\nuNUCIQCsFLaIJZiN4jlgbxlyLVeya9lLuqIwvqqPQl6q4ad12QIgS9gG48xmdHig\n8z3IdIMedZ8ZCtKmEun6Cp1+BsK0wDUCIF0nHfSuU+eTQ2qAON2SHIrJf8UeFO7N\nzdTN1IwwQqjI\n-----END PRIVATE KEY-----\n",
"client_id": "client_id",
}
authenticator = GoogleServiceKeyAuthenticator(credentials)
auth_request = requests_mock.register_uri(
"POST", authenticator._google_oauth2_token_endpoint, json={"access_token": "bearer_token", "expires_in": 3600}
)
authenticated_request = authenticator(requests.Request())
assert auth_request.call_count == 1
assert auth_request.last_request.qs.get("assertion") == [
"eyjhbgcioijsuzi1niisimtpzci6imnsawvudf9pzcisinr5cci6ikpxvcj9.eyjpc3mioijjbgllbnrfzw1hawwilcjzy29wzsi6imh0dhbzoi8vd3d3lmdvb2dszwfwaxmuy29tl2f1dggvyw5hbhl0awnzlnjlywrvbmx5iiwiyxvkijoiahr0chm6ly9vyxv0adiuz29vz2xlyxbpcy5jb20vdg9rzw4ilcjlehaioje2nzi1mzq4mdasimlhdci6mty3mjuzmtiwmh0.oy_do4cxytjclgajcutbolxftlba89bt2ipuegmis7crh9no_q9h4ispv7iquz5d5h58tpftjhdayb5jfuvheq"
]
assert auth_request.last_request.qs.get("grant_type") == ["urn:ietf:params:oauth:grant-type:jwt-bearer"]
assert authenticator._token.get("expires_at") == 1672534800
assert authenticated_request.headers.get("Authorization") == "Bearer bearer_token"

View File

@@ -1,37 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from unittest.mock import patch
from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi
from source_google_analytics_data_api.config_migrations import MigratePropertyID
from airbyte_cdk.entrypoint import AirbyteEntrypoint
@patch.object(SourceGoogleAnalyticsDataApi, "read_config")
@patch.object(SourceGoogleAnalyticsDataApi, "write_config")
@patch.object(AirbyteEntrypoint, "extract_config")
def test_migration(ab_entrypoint_extract_config_mock, source_write_config_mock, source_read_config_mock):
source = SourceGoogleAnalyticsDataApi()
source_read_config_mock.return_value = {
"credentials": {"auth_type": "Service", "credentials_json": "<credentials string ...>"},
"custom_reports": "<custom reports out of current test>",
"date_ranges_start_date": "2023-09-01",
"window_in_days": 30,
"property_id": "111111111",
}
ab_entrypoint_extract_config_mock.return_value = "/path/to/config.json"
def check_migrated_value(new_config, path):
assert path == "/path/to/config.json"
assert "property_id" not in new_config
assert "property_ids" in new_config
assert "111111111" in new_config["property_ids"]
assert len(new_config["property_ids"]) == 1
source_write_config_mock.side_effect = check_migrated_value
MigratePropertyID.migrate(["--config", "/path/to/config.json"], source)

View File

@@ -1,59 +0,0 @@
{
"credentials": {
"auth_type": "Service",
"credentials_json": ""
},
"date_ranges_start_date": "2023-09-01",
"window_in_days": 30,
"property_ids": "314186564",
"custom_reports_array": [
{
"name": "cohort_report",
"dimensions": ["cohort", "cohortNthDay"],
"metrics": ["cohortActiveUsers"],
"cohortSpec": {
"cohorts": [
{
"dimension": "firstSessionDate",
"dateRange": {
"startDate": "2023-04-24",
"endDate": "2023-04-24"
}
}
],
"cohortsRange": {
"endOffset": 100,
"granularity": "DAILY"
},
"cohortReportSettings": {
"accumulate": false
}
}
},
{
"name": "pivot_report",
"dateRanges": [
{
"startDate": "2020-09-01",
"endDate": "2020-09-15"
}
],
"dimensions": ["browser", "country", "language"],
"metrics": ["sessions"],
"pivots": [
{
"fieldNames": ["browser"],
"limit": 5
},
{
"fieldNames": ["country"],
"limit": 250
},
{
"fieldNames": ["language"],
"limit": 15
}
]
}
]
}

View File

@@ -1,48 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import os
from typing import Any, Mapping
import dpath.util
from source_google_analytics_data_api.config_migrations import MigrateCustomReportsCohortSpec
from source_google_analytics_data_api.source import SourceGoogleAnalyticsDataApi
from airbyte_cdk.models import OrchestratorType, Type
from airbyte_cdk.sources import Source
# BASE ARGS
CMD = "check"
TEST_CONFIG_PATH = f"{os.path.dirname(__file__)}/test_config.json"
NEW_TEST_CONFIG_PATH = f"{os.path.dirname(__file__)}/test_new_config.json"
SOURCE_INPUT_ARGS = [CMD, "--config", TEST_CONFIG_PATH]
SOURCE: Source = SourceGoogleAnalyticsDataApi()
# HELPERS
def load_config(config_path: str = TEST_CONFIG_PATH) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)
def test_migrate_config(capsys):
migration_instance = MigrateCustomReportsCohortSpec()
# migrate the test_config
migration_instance.migrate(SOURCE_INPUT_ARGS, SOURCE)
what = capsys.readouterr().out
control_msg = json.loads(what)
assert control_msg["type"] == Type.CONTROL.value
assert control_msg["control"]["type"] == OrchestratorType.CONNECTOR_CONFIG.value
assert control_msg["control"]["connectorConfig"]["config"]["custom_reports_array"][0]["cohortSpec"]["enabled"] == "true"
assert control_msg["control"]["connectorConfig"]["config"]["custom_reports_array"][1]["cohortSpec"]["enabled"] == "false"
def test_should_not_migrate_new_config():
new_config = load_config(NEW_TEST_CONFIG_PATH)
assert not MigrateCustomReportsCohortSpec._should_migrate(new_config)

View File

@@ -1,63 +0,0 @@
{
"credentials": {
"auth_type": "Service",
"credentials_json": ""
},
"date_ranges_start_date": "2023-09-01",
"window_in_days": 30,
"property_ids": "314186564",
"custom_reports_array": [
{
"name": "cohort_report",
"dimensions": ["cohort", "cohortNthDay"],
"metrics": ["cohortActiveUsers"],
"cohortSpec": {
"cohorts": [
{
"dimension": "firstSessionDate",
"dateRange": {
"startDate": "2023-04-24",
"endDate": "2023-04-24"
}
}
],
"cohortsRange": {
"endOffset": 100,
"granularity": "DAILY"
},
"cohortReportSettings": {
"accumulate": false
},
"enable": "true"
}
},
{
"name": "pivot_report",
"dateRanges": [
{
"startDate": "2020-09-01",
"endDate": "2020-09-15"
}
],
"dimensions": ["browser", "country", "language"],
"metrics": ["sessions"],
"pivots": [
{
"fieldNames": ["browser"],
"limit": 5
},
{
"fieldNames": ["country"],
"limit": 250
},
{
"fieldNames": ["language"],
"limit": 15
}
],
"cohortSpec": {
"enabled": "false"
}
}
]
}

View File

@@ -1,7 +0,0 @@
{
"credentials": { "auth_type": "Service", "credentials_json": "" },
"custom_reports": "[{\"name\": \"custom_dimensions\", \"dimensions\": [\"date\", \"country\", \"device\"]}]",
"date_ranges_start_date": "2023-09-01",
"window_in_days": 30,
"property_ids": "314186564"
}

View File

@@ -1,77 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
from typing import Any, Mapping
from source_google_analytics_data_api.config_migrations import MigrateCustomReports
from source_google_analytics_data_api.source import SourceGoogleAnalyticsDataApi
from airbyte_cdk.models import OrchestratorType, Type
from airbyte_cdk.sources import Source
# BASE ARGS
CMD = "check"
TEST_CONFIG_PATH = "unit_tests/test_migrations/test_config.json"
SOURCE_INPUT_ARGS = [CMD, "--config", TEST_CONFIG_PATH]
SOURCE: Source = SourceGoogleAnalyticsDataApi()
# HELPERS
def load_config(config_path: str = TEST_CONFIG_PATH) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)
def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
config.pop("custom_reports_array")
with open(config_path, "w") as updated_config:
config = json.dumps(config)
updated_config.write(config)
def test_migrate_config(capsys):
migration_instance = MigrateCustomReports()
original_config = load_config()
# migrate the test_config
migration_instance.migrate(SOURCE_INPUT_ARGS, SOURCE)
# load the updated config
test_migrated_config = load_config()
# check migrated property
assert "custom_reports_array" in test_migrated_config
assert isinstance(test_migrated_config["custom_reports_array"], list)
# check the old property is in place
assert "custom_reports" in test_migrated_config
assert isinstance(test_migrated_config["custom_reports"], str)
# check the migration should be skipped, once already done
assert not migration_instance._should_migrate(test_migrated_config)
# load the old custom reports VS migrated
assert json.loads(original_config["custom_reports"]) == test_migrated_config["custom_reports_array"]
# test CONTROL MESSAGE was emitted
control_msg = json.loads(capsys.readouterr().out)
assert control_msg["type"] == Type.CONTROL.value
assert control_msg["control"]["type"] == OrchestratorType.CONNECTOR_CONFIG.value
# old custom_reports are stil type(str)
assert isinstance(control_msg["control"]["connectorConfig"]["config"]["custom_reports"], str)
# new custom_reports are type(list)
assert isinstance(control_msg["control"]["connectorConfig"]["config"]["custom_reports_array"], list)
# check the migrated values
assert control_msg["control"]["connectorConfig"]["config"]["custom_reports_array"][0]["name"] == "custom_dimensions"
assert control_msg["control"]["connectorConfig"]["config"]["custom_reports_array"][0]["dimensions"] == ["date", "country", "device"]
# revert the test_config to the starting point
revert_migration()
def test_config_is_reverted():
# check the test_config state, it has to be the same as before tests
test_config = load_config()
# check the config no longer has the migarted property
assert "custom_reports_array" not in test_config
# check the old property is still there
assert "custom_reports" in test_config
assert isinstance(test_config["custom_reports"], str)

View File

@@ -1,259 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock, patch
import pytest
from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi
from source_google_analytics_data_api.api_quota import GoogleAnalyticsApiQuotaBase
from source_google_analytics_data_api.source import GoogleAnalyticsDatApiErrorHandler, MetadataDescriptor
from source_google_analytics_data_api.utils import NO_DIMENSIONS, NO_METRICS, NO_NAME, WRONG_CUSTOM_REPORT_CONFIG, WRONG_JSON_SYNTAX
from airbyte_cdk.models import AirbyteConnectionStatus, FailureType, Status
from airbyte_cdk.sources.streams.http.http import HttpStatusErrorHandler
from airbyte_cdk.utils import AirbyteTracedException
@pytest.mark.parametrize(
"config_values, is_successful, message",
[
({}, Status.SUCCEEDED, None),
({"custom_reports_array": ...}, Status.SUCCEEDED, None),
({"custom_reports_array": "[]"}, Status.SUCCEEDED, None),
({"custom_reports_array": "invalid"}, Status.FAILED, f"'{WRONG_JSON_SYNTAX}'"),
({"custom_reports_array": "{}"}, Status.FAILED, f"'{WRONG_JSON_SYNTAX}'"),
({"custom_reports_array": "[{}]"}, Status.FAILED, f"'{NO_NAME}'"),
({"custom_reports_array": '[{"name": "name"}]'}, Status.FAILED, f"'{NO_DIMENSIONS}'"),
({"custom_reports_array": '[{"name": "daily_active_users", "dimensions": ["date"]}]'}, Status.FAILED, f"'{NO_METRICS}'"),
(
{"custom_reports_array": '[{"name": "daily_active_users", "metrics": ["totalUsers"], "dimensions": [{"name": "city"}]}]'},
Status.FAILED,
"\"The custom report daily_active_users entered contains invalid dimensions: {'name': 'city'} is not of type 'string'. Validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/).\"",
),
({"date_ranges_start_date": "2022-20-20"}, Status.FAILED, "\"time data '2022-20-20' does not match format '%Y-%m-%d'\""),
({"date_ranges_end_date": "2022-20-20"}, Status.FAILED, "\"time data '2022-20-20' does not match format '%Y-%m-%d'\""),
(
{"date_ranges_start_date": "2022-12-20", "date_ranges_end_date": "2022-12-10"},
Status.FAILED,
"\"End date '2022-12-10' can not be before start date '2022-12-20'\"",
),
(
{"credentials": {"auth_type": "Service", "credentials_json": "invalid"}},
Status.FAILED,
"'credentials.credentials_json is not valid JSON'",
),
(
{"custom_reports_array": '[{"name": "name", "dimensions": [], "metrics": []}]'},
Status.FAILED,
"'The custom report name entered contains invalid dimensions: [] is too short. Validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/).'",
),
(
{"custom_reports_array": '[{"name": "daily_active_users", "dimensions": ["date"], "metrics": ["totalUsers"]}]'},
Status.FAILED,
"'Custom reports: daily_active_users already exist as a default report(s).'",
),
(
{"custom_reports_array": '[{"name": "name", "dimensions": ["unknown"], "metrics": ["totalUsers"]}]'},
Status.FAILED,
"'The custom report name entered contains invalid dimensions: unknown. Validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/).'",
),
(
{"custom_reports_array": '[{"name": "name", "dimensions": ["date"], "metrics": ["unknown"]}]'},
Status.FAILED,
"'The custom report name entered contains invalid metrics: unknown. Validate your custom query with the GA 4 Query Explorer (https://ga-dev-tools.google/ga4/query-explorer/).'",
),
(
{
"custom_reports_array": '[{"name": "pivot_report", "dateRanges": [{ "startDate": "2020-09-01", "endDate": "2020-09-15" }], "dimensions": ["browser", "country", "language"], "metrics": ["sessions"], "pivots": {}}]'
},
Status.FAILED,
"\"The custom report pivot_report entered contains invalid pivots: {} is not of type 'null', 'array'. Ensure the pivot follow the syntax described in the docs (https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/Pivot).\"",
),
],
)
def test_check(requests_mock, config_gen, config_values, is_successful, message):
requests_mock.register_uri(
"POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}
)
requests_mock.register_uri(
"GET",
"https://analyticsdata.googleapis.com/v1beta/properties/108176369/metadata",
json={
"dimensions": [{"apiName": "date"}, {"apiName": "country"}, {"apiName": "language"}, {"apiName": "browser"}],
"metrics": [{"apiName": "totalUsers"}, {"apiName": "screenPageViews"}, {"apiName": "sessions"}],
},
)
requests_mock.register_uri(
"POST",
"https://analyticsdata.googleapis.com/v1beta/properties/108176369:runReport",
json={
"dimensionHeaders": [{"name": "date"}, {"name": "country"}],
"metricHeaders": [{"name": "totalUsers", "type": "s"}, {"name": "screenPageViews", "type": "m"}],
"rows": [],
},
)
source = SourceGoogleAnalyticsDataApi()
logger = MagicMock()
assert source.check(logger, config_gen(**config_values)) == AirbyteConnectionStatus(status=is_successful, message=message)
@pytest.mark.parametrize("error_code", (400, 403))
def test_check_failure_throws_exception(requests_mock, config_gen, error_code):
requests_mock.register_uri(
"POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}
)
requests_mock.register_uri(
"GET", "https://analyticsdata.googleapis.com/v1beta/properties/UA-11111111/metadata", json={}, status_code=error_code
)
source = SourceGoogleAnalyticsDataApi()
logger = MagicMock()
with pytest.raises(AirbyteTracedException) as e:
source.check(logger, config_gen(property_ids=["UA-11111111"]))
assert e.value.failure_type == FailureType.config_error
assert "Access was denied to the property ID entered." in e.value.message
def test_exhausted_quota_recovers_after_two_retries(requests_mock, config_gen):
"""
If the account runs out of quota the api will return a message asking us to back off for one hour.
We have set backoff time for this scenario to 30 minutes to check if quota is already recovered, if not
it will backoff again 30 minutes and quote should be reestablished by then.
Now, we don't want wait one hour to test out this retry behavior so we will fix time dividing by 600 the quota
recovery time and also the backoff time.
"""
requests_mock.register_uri(
"POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}
)
error_response = {
"error": {
"message": "Exhausted potentially thresholded requests quota. This quota will refresh in under an hour. To learn more, see"
}
}
requests_mock.register_uri(
"GET",
"https://analyticsdata.googleapis.com/v1beta/properties/UA-11111111/metadata",
# first try we get 429 t=~0
[
{"json": error_response, "status_code": 429},
# first retry we get 429 t=~1800
{"json": error_response, "status_code": 429},
# second retry quota is recovered, t=~3600
{
"json": {
"dimensions": [{"apiName": "date"}, {"apiName": "country"}, {"apiName": "language"}, {"apiName": "browser"}],
"metrics": [{"apiName": "totalUsers"}, {"apiName": "screenPageViews"}, {"apiName": "sessions"}],
},
"status_code": 200,
},
],
)
def fix_time(time):
return int(time / 600)
source = SourceGoogleAnalyticsDataApi()
logger = MagicMock()
max_time_fixed = fix_time(GoogleAnalyticsDatApiErrorHandler.QUOTA_RECOVERY_TIME)
potentially_thresholded_requests_per_hour_mapping = GoogleAnalyticsApiQuotaBase.quota_mapping["potentiallyThresholdedRequestsPerHour"]
threshold_backoff_time = potentially_thresholded_requests_per_hour_mapping["backoff"]
fixed_threshold_backoff_time = fix_time(threshold_backoff_time)
potentially_thresholded_requests_per_hour_mapping_fixed = {
**potentially_thresholded_requests_per_hour_mapping,
"backoff": fixed_threshold_backoff_time,
}
with (
patch.object(GoogleAnalyticsDatApiErrorHandler, "QUOTA_RECOVERY_TIME", new=max_time_fixed),
patch.object(
GoogleAnalyticsApiQuotaBase,
"quota_mapping",
new={
**GoogleAnalyticsApiQuotaBase.quota_mapping,
"potentiallyThresholdedRequestsPerHour": potentially_thresholded_requests_per_hour_mapping_fixed,
},
),
):
output = source.check(logger, config_gen(property_ids=["UA-11111111"]))
assert output == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None)
@pytest.mark.parametrize("error_code", (402, 404, 405))
def test_check_failure(requests_mock, config_gen, error_code):
requests_mock.register_uri(
"POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}
)
requests_mock.register_uri(
"GET", "https://analyticsdata.googleapis.com/v1beta/properties/UA-11111111/metadata", json={}, status_code=error_code
)
source = SourceGoogleAnalyticsDataApi()
logger = MagicMock()
with patch.object(HttpStatusErrorHandler, "max_retries", new=0):
airbyte_status = source.check(logger, config_gen(property_ids=["UA-11111111"]))
assert airbyte_status.status == Status.FAILED
assert airbyte_status.message == repr("Failed to get metadata, over quota, try later")
@pytest.mark.parametrize(
("status_code", "response_error_message"),
(
(403, "Forbidden for some reason"),
(400, "Granularity in the cohortsRange is required."),
),
)
def test_check_incorrect_custom_reports_config(requests_mock, config_gen, status_code, response_error_message):
requests_mock.register_uri(
"POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}
)
requests_mock.register_uri(
"GET",
"https://analyticsdata.googleapis.com/v1beta/properties/108176369/metadata",
json={
"dimensions": [{"apiName": "date"}, {"apiName": "country"}, {"apiName": "language"}, {"apiName": "browser"}],
"metrics": [{"apiName": "totalUsers"}, {"apiName": "screenPageViews"}, {"apiName": "sessions"}],
},
)
requests_mock.register_uri(
"POST",
"https://analyticsdata.googleapis.com/v1beta/properties/108176369:runReport",
status_code=status_code,
json={"error": {"message": response_error_message}},
)
report_name = "cohort_report"
config = {"custom_reports_array": f'[{{"name": "{report_name}", "dimensions": ["date"], "metrics": ["totalUsers"]}}]'}
friendly_message = WRONG_CUSTOM_REPORT_CONFIG.format(report=report_name)
source = SourceGoogleAnalyticsDataApi()
logger = MagicMock()
status, message = source.check_connection(logger, config_gen(**config))
assert status is False
assert message == f"{friendly_message} {response_error_message}"
@pytest.mark.parametrize("status_code", (403, 401))
def test_missing_metadata(requests_mock, status_code):
# required for MetadataDescriptor $instance input
class TestConfig:
config = {
"authenticator": None,
"property_id": 123,
}
# mocking the url for metadata
requests_mock.register_uri(
"GET", "https://analyticsdata.googleapis.com/v1beta/properties/123/metadata", json={}, status_code=status_code
)
metadata_descriptor = MetadataDescriptor()
with pytest.raises(AirbyteTracedException) as e:
metadata_descriptor.__get__(TestConfig(), None)
assert e.value.failure_type == FailureType.config_error
def test_streams(patch_base_class, config_gen):
config = config_gen(property_ids=["Prop1", "PropN"])
source = SourceGoogleAnalyticsDataApi()
streams = source.streams(config)
expected_streams_number = 57 * 2
assert len([stream for stream in streams if "_property_" in stream.name]) == 57
assert len(set(streams)) == expected_streams_number

View File

@@ -1,477 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
import json
import random
from http import HTTPStatus
from typing import Any, Mapping
from unittest.mock import MagicMock
import pytest
from freezegun import freeze_time
from requests.models import Response
from source_google_analytics_data_api.source import GoogleAnalyticsDataApiBaseStream, SourceGoogleAnalyticsDataApi
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, FailureType, ResponseAction
from .utils import read_incremental
@pytest.fixture
def patch_base_class(mocker, config, config_without_date_range):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(GoogleAnalyticsDataApiBaseStream, "path", f"{random.randint(100000000, 999999999)}:runReport")
mocker.patch.object(GoogleAnalyticsDataApiBaseStream, "primary_key", "test_primary_key")
mocker.patch.object(GoogleAnalyticsDataApiBaseStream, "__abstractmethods__", set())
return {"config": config, "config_without_date_range": config_without_date_range}
def test_json_schema(requests_mock, patch_base_class):
requests_mock.register_uri(
"POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}
)
requests_mock.register_uri(
"GET",
"https://analyticsdata.googleapis.com/v1beta/properties/108176369/metadata",
json={
"dimensions": [{"apiName": "date"}, {"apiName": "country"}, {"apiName": "language"}, {"apiName": "browser"}],
"metrics": [{"apiName": "totalUsers"}, {"apiName": "screenPageViews"}, {"apiName": "sessions"}],
},
)
schema = GoogleAnalyticsDataApiBaseStream(
authenticator=MagicMock(), config={"authenticator": MagicMock(), **patch_base_class["config_without_date_range"]}
).get_json_schema()
for d in patch_base_class["config_without_date_range"]["dimensions"]:
assert d in schema["properties"]
for p in patch_base_class["config_without_date_range"]["metrics"]:
assert p in schema["properties"]
assert "startDate" in schema["properties"]
assert "endDate" in schema["properties"]
def test_request_params(patch_base_class):
assert (
GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]).request_params(
stream_state=MagicMock(), stream_slice=MagicMock(), next_page_token=MagicMock()
)
== {}
)
def test_request_body_json(patch_base_class):
stream_slice = {"startDate": "2024-01-01", "endDate": "2024-01-31"}
request_body_params = {"stream_state": MagicMock(), "stream_slice": stream_slice, "next_page_token": None}
expected_body_json = {
"metrics": [
{"name": "totalUsers"},
{"name": "newUsers"},
{"name": "sessions"},
{"name": "sessionsPerUser"},
{"name": "averageSessionDuration"},
{"name": "screenPageViews"},
{"name": "screenPageViewsPerSession"},
{"name": "bounceRate"},
],
"dimensions": [
{"name": "date"},
{"name": "deviceCategory"},
{"name": "operatingSystem"},
{"name": "browser"},
],
"keepEmptyRows": True,
"dateRanges": [
{"startDate": request_body_params["stream_slice"]["startDate"], "endDate": request_body_params["stream_slice"]["endDate"]}
],
"returnPropertyQuota": True,
"offset": str(0),
"limit": "100000",
}
request_body_json = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]).request_body_json(
**request_body_params
)
assert request_body_json == expected_body_json
def test_changed_page_size(patch_base_class):
request_body_params = {"stream_state": MagicMock(), "stream_slice": MagicMock(), "next_page_token": None}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
stream.page_size = 100
request_body_json = stream.request_body_json(**request_body_params)
assert request_body_json["limit"] == "100"
def test_next_page_token_equal_chunk(patch_base_class):
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
response = MagicMock()
response.json.side_effect = [
{"rowCount": 300000},
{"rowCount": 300000},
{"rowCount": 300000},
]
inputs = {"response": response}
expected_tokens = [
{"offset": 100000},
{"offset": 200000},
None,
]
for expected_token in expected_tokens:
assert stream.next_page_token(**inputs) == expected_token
def test_next_page_token(patch_base_class):
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
response = MagicMock()
response.json.side_effect = [
{"rowCount": 450000},
{"rowCount": 450000},
{"rowCount": 450000},
{"rowCount": 450000},
{"rowCount": 450000},
]
inputs = {"response": response}
expected_tokens = [
{"offset": 100000},
{"offset": 200000},
{"offset": 300000},
{"offset": 400000},
None,
]
for expected_token in expected_tokens:
assert stream.next_page_token(**inputs) == expected_token
def test_parse_response(patch_base_class):
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
response_data = {
"dimensionHeaders": [{"name": "date"}, {"name": "deviceCategory"}, {"name": "operatingSystem"}, {"name": "browser"}],
"metricHeaders": [
{"name": "totalUsers", "type": "TYPE_INTEGER"},
{"name": "newUsers", "type": "TYPE_INTEGER"},
{"name": "sessions", "type": "TYPE_INTEGER"},
{"name": "sessionsPerUser:parameter", "type": "TYPE_FLOAT"},
{"name": "averageSessionDuration", "type": "TYPE_SECONDS"},
{"name": "screenPageViews", "type": "TYPE_INTEGER"},
{"name": "screenPageViewsPerSession", "type": "TYPE_FLOAT"},
{"name": "bounceRate", "type": "TYPE_FLOAT"},
],
"rows": [
{
"dimensionValues": [{"value": "20220731"}, {"value": "desktop"}, {"value": "Macintosh"}, {"value": "Chrome"}],
"metricValues": [
{"value": "344.234"}, # This is a float will be converted to int
{"value": "169.345345"}, # This is a float will be converted to int
{"value": "420"},
{"value": "1.2209302325581395"},
{"value": "194.76313766428572"},
{"value": "614"},
{"value": "1.4619047619047618"},
{"value": "0.47857142857142859"},
],
},
{
"dimensionValues": [{"value": "20220731"}, {"value": "desktop"}, {"value": "Windows"}, {"value": "Chrome"}],
"metricValues": [
{"value": "322"},
{"value": "211"},
{"value": "387"},
{"value": "1.2018633540372672"},
{"value": "249.21595714211884"},
{"value": "669"},
{"value": "1.7286821705426356"},
{"value": "0.42377260981912146"},
],
},
],
"rowCount": 54,
"metadata": {"currencyCode": "USD", "timeZone": "America/Los_Angeles"},
"kind": "analyticsData#runReport",
}
expected_data = [
{
"property_id": "108176369",
"date": "20220731",
"deviceCategory": "desktop",
"operatingSystem": "Macintosh",
"browser": "Chrome",
"totalUsers": 344,
"newUsers": 169,
"sessions": 420,
"sessionsPerUser:parameter": 1.2209302325581395,
"averageSessionDuration": 194.76313766428572,
"screenPageViews": 614,
"screenPageViewsPerSession": 1.4619047619047618,
"bounceRate": 0.47857142857142859,
},
{
"property_id": "108176369",
"date": "20220731",
"deviceCategory": "desktop",
"operatingSystem": "Windows",
"browser": "Chrome",
"totalUsers": 322,
"newUsers": 211,
"sessions": 387,
"sessionsPerUser:parameter": 1.2018633540372672,
"averageSessionDuration": 249.21595714211884,
"screenPageViews": 669,
"screenPageViewsPerSession": 1.7286821705426356,
"bounceRate": 0.42377260981912146,
},
]
response = MagicMock()
response.json.return_value = response_data
inputs = {"response": response, "stream_state": {}}
actual_records: Mapping[str, Any] = list(stream.parse_response(**inputs))
assert actual_records == expected_data
def test_request_headers(patch_base_class):
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
expected_headers = {}
assert stream.request_headers(**inputs) == expected_headers
def test_http_method(patch_base_class):
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
expected_method = "POST"
assert stream.http_method == expected_method
@pytest.mark.parametrize(
("http_status", "response_action_expected", "response_body"),
[
(HTTPStatus.OK, ResponseAction.SUCCESS, {}),
(HTTPStatus.BAD_REQUEST, ResponseAction.FAIL, {}),
(HTTPStatus.TOO_MANY_REQUESTS, ResponseAction.RETRY, {}),
(HTTPStatus.TOO_MANY_REQUESTS, ResponseAction.RETRY, {"error": {"message": "Exhausted concurrent requests quota."}}),
(HTTPStatus.INTERNAL_SERVER_ERROR, ResponseAction.RETRY, {}),
],
)
def test_should_retry(patch_base_class, http_status, response_action_expected, response_body):
response_mock = Response()
response_mock.status_code = http_status
if response_body:
json_data = response_body
response_mock._content = str.encode(json.dumps(json_data))
response_mock.headers = {"Content-Type": "application/json"}
response_mock.encoding = "utf-8"
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
assert stream.get_error_handler().interpret_response(response_mock).response_action == response_action_expected
def test_backoff_time(patch_base_class):
response_mock = Response()
stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"])
expected_backoff_time = None
assert stream.get_backoff_strategy().backoff_time(response_mock) == expected_backoff_time
@freeze_time("2023-01-01 00:00:00")
def test_stream_slices():
config = {"date_ranges_start_date": datetime.date(2022, 12, 29), "window_in_days": 1, "dimensions": ["date"]}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-29", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2022-12-30"},
{"startDate": "2022-12-31", "endDate": "2022-12-31"},
{"startDate": "2023-01-01", "endDate": "2023-01-01"},
]
config = {"date_ranges_start_date": datetime.date(2022, 12, 28), "window_in_days": 2, "dimensions": ["date"]}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-28", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2022-12-31"},
{"startDate": "2023-01-01", "endDate": "2023-01-01"},
]
config = {"date_ranges_start_date": datetime.date(2022, 12, 20), "window_in_days": 5, "dimensions": ["date"]}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-20", "endDate": "2022-12-24"},
{"startDate": "2022-12-25", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2023-01-01"},
]
config = {
"date_ranges_start_date": datetime.date(2022, 12, 20),
"date_ranges_end_date": datetime.date(2022, 12, 26),
"window_in_days": 5,
"dimensions": ["date"],
}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
slices = list(stream.stream_slices(sync_mode=None))
assert slices == [
{"startDate": "2022-12-20", "endDate": "2022-12-24"},
{"startDate": "2022-12-25", "endDate": "2022-12-26"},
]
@freeze_time("2023-01-01 00:00:00")
def test_full_refresh():
"""
Test case when full refresh state is used
"""
config = {"date_ranges_start_date": datetime.date(2022, 12, 29), "window_in_days": 1, "dimensions": ["browser", "country", "language"]}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
full_refresh_state = {"__ab_full_refresh_state_message": True}
slices = list(stream.stream_slices(sync_mode=None, stream_state=full_refresh_state))
assert slices == [
{"startDate": "2022-12-29", "endDate": "2022-12-29"},
{"startDate": "2022-12-30", "endDate": "2022-12-30"},
{"startDate": "2022-12-31", "endDate": "2022-12-31"},
{"startDate": "2023-01-01", "endDate": "2023-01-01"},
]
def test_read_incremental(requests_mock):
config = {
"property_ids": [123],
"property_id": 123,
"date_ranges_start_date": datetime.date(2022, 1, 6),
"window_in_days": 1,
"dimensions": ["yearWeek"],
"metrics": ["totalUsers"],
}
stream = GoogleAnalyticsDataApiBaseStream(authenticator=None, config=config)
stream_state = {}
responses = [
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202201"}], "metricValues": [{"value": "100"}]}],
"rowCount": 1,
},
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202201"}], "metricValues": [{"value": "110"}]}],
"rowCount": 1,
},
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202201"}], "metricValues": [{"value": "120"}]}],
"rowCount": 1,
},
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202202"}], "metricValues": [{"value": "130"}]}],
"rowCount": 1,
},
# 2-nd incremental read
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202202"}], "metricValues": [{"value": "112"}]}],
"rowCount": 1,
},
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202202"}], "metricValues": [{"value": "125"}]}],
"rowCount": 1,
},
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202202"}], "metricValues": [{"value": "140"}]}],
"rowCount": 1,
},
{
"dimensionHeaders": [{"name": "yearWeek"}],
"metricHeaders": [{"name": "totalUsers", "type": "TYPE_INTEGER"}],
"rows": [{"dimensionValues": [{"value": "202202"}], "metricValues": [{"value": "150"}]}],
"rowCount": 1,
},
]
requests_mock.register_uri(
"POST",
"https://analyticsdata.googleapis.com/v1beta/properties/123:runReport",
json=lambda request, context: responses.pop(0),
)
with freeze_time("2022-01-09 12:00:00"):
records = list(read_incremental(stream, stream_state))
print(records)
assert records == [
{"property_id": 123, "yearWeek": "202201", "totalUsers": 100, "startDate": "2022-01-06", "endDate": "2022-01-06"},
{"property_id": 123, "yearWeek": "202201", "totalUsers": 110, "startDate": "2022-01-07", "endDate": "2022-01-07"},
{"property_id": 123, "yearWeek": "202201", "totalUsers": 120, "startDate": "2022-01-08", "endDate": "2022-01-08"},
{"property_id": 123, "yearWeek": "202202", "totalUsers": 130, "startDate": "2022-01-09", "endDate": "2022-01-09"},
]
assert stream_state == {"yearWeek": "202202"}
with freeze_time("2022-01-10 12:00:00"):
records = list(read_incremental(stream, stream_state))
assert records == [
{"property_id": 123, "yearWeek": "202202", "totalUsers": 112, "startDate": "2022-01-08", "endDate": "2022-01-08"},
{"property_id": 123, "yearWeek": "202202", "totalUsers": 125, "startDate": "2022-01-09", "endDate": "2022-01-09"},
{"property_id": 123, "yearWeek": "202202", "totalUsers": 140, "startDate": "2022-01-10", "endDate": "2022-01-10"},
]
@pytest.mark.parametrize(
"config_dimensions, expected_state",
[
pytest.param(["browser", "country", "language", "date"], {"date": "20240320"}, id="test_date_no_cursor_field_dimension"),
pytest.param(["browser", "country", "language"], {}, id="test_date_cursor_field_dimension"),
],
)
def test_get_updated_state(config_dimensions, expected_state):
config = {
"credentials": {
"auth_type": "Service",
"credentials_json": '{ "client_email": "a@gmail.com", "client_id": "1234", "client_secret": "5678", "private_key": "5678"}',
},
"date_ranges_start_date": "2023-04-01",
"window_in_days": 30,
"property_ids": ["123"],
"custom_reports_array": [
{
"name": "pivot_report",
"dateRanges": [{"startDate": "2020-09-01", "endDate": "2020-09-15"}],
"dimensions": config_dimensions,
"metrics": ["sessions"],
"pivots": [
{"fieldNames": ["browser"], "limit": 5},
{"fieldNames": ["country"], "limit": 250},
{"fieldNames": ["language"], "limit": 15},
],
"cohortSpec": {"enabled": "false"},
}
],
}
source = SourceGoogleAnalyticsDataApi()
config = source._validate_and_transform(config, report_names=set())
config["authenticator"] = source.get_authenticator(config)
report_stream = source.instantiate_report_class(config["custom_reports_array"][0], False, config, page_size=100)
actual_state = report_stream.get_updated_state(current_stream_state={}, latest_record={"date": "20240320"})
assert actual_state == expected_state

View File

@@ -1,164 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import sys
from unittest.mock import Mock, mock_open, patch
import pytest
from source_google_analytics_data_api.utils import (
get_source_defined_primary_key,
serialize_to_date_string,
transform_between_filter,
transform_expression,
transform_in_list_filter,
transform_json,
transform_numeric_filter,
transform_string_filter,
)
class TestSerializeToDateString:
@pytest.mark.parametrize(
"input_date, date_format, date_type, expected",
[
("202105", "%Y-%m-%d", "yearWeek", "2021-02-01"),
("202105", "%Y-%m-%d", "yearMonth", "2021-05-01"),
("202245", "%Y-%m-%d", "yearWeek", "2022-11-07"),
("202210", "%Y-%m-%d", "yearMonth", "2022-10-01"),
("2022", "%Y-%m-%d", "year", "2022-01-01"),
],
)
def test_valid_cases(self, input_date, date_format, date_type, expected):
result = serialize_to_date_string(input_date, date_format, date_type)
assert result == expected
def test_invalid_type(self):
with pytest.raises(ValueError):
serialize_to_date_string("202105", "%Y-%m-%d", "invalidType")
class TestTransformFilters:
def test_transform_string_filter(self):
filter_data = {"value": "test", "matchType": ["partial"], "caseSensitive": True}
expected = {"stringFilter": {"value": "test", "matchType": "partial", "caseSensitive": True}}
result = transform_string_filter(filter_data)
assert result == expected
def test_transform_in_list_filter(self):
filter_data = {"values": ["test1", "test2"], "caseSensitive": False}
expected = {"inListFilter": {"values": ["test1", "test2"], "caseSensitive": False}}
result = transform_in_list_filter(filter_data)
assert result == expected
def test_transform_numeric_filter(self):
filter_data = {"value": {"value_type": "doubleValue", "value": 5.5}, "operation": ["equals"]}
expected = {"numericFilter": {"value": {"doubleValue": 5.5}, "operation": "equals"}}
result = transform_numeric_filter(filter_data)
assert result == expected
@pytest.mark.parametrize(
"filter_data, expected",
[
(
{"fromValue": {"value_type": "doubleValue", "value": "10.5"}, "toValue": {"value_type": "doubleValue", "value": "20.5"}},
{"betweenFilter": {"fromValue": {"doubleValue": 10.5}, "toValue": {"doubleValue": 20.5}}},
),
(
{"fromValue": {"value_type": "stringValue", "value": "hello"}, "toValue": {"value_type": "stringValue", "value": "world"}},
{"betweenFilter": {"fromValue": {"stringValue": "hello"}, "toValue": {"stringValue": "world"}}},
),
(
{"fromValue": {"value_type": "doubleValue", "value": 10.5}, "toValue": {"value_type": "doubleValue", "value": 20.5}},
{"betweenFilter": {"fromValue": {"doubleValue": 10.5}, "toValue": {"doubleValue": 20.5}}},
),
],
)
def test_transform_between_filter(self, filter_data, expected):
result = transform_between_filter(filter_data)
assert result == expected
class TestTransformExpression:
@patch("source_google_analytics_data_api.utils.transform_string_filter", Mock(return_value={"stringFilter": "mocked_string_filter"}))
@patch("source_google_analytics_data_api.utils.transform_in_list_filter", Mock(return_value={"inListFilter": "mocked_in_list_filter"}))
@patch("source_google_analytics_data_api.utils.transform_numeric_filter", Mock(return_value={"numericFilter": "mocked_numeric_filter"}))
def test_between_filter(self):
expression = {
"field_name": "some_field",
"filter": {
"filter_name": "betweenFilter",
"fromValue": {"value_type": "doubleValue", "value": "10.5"},
"toValue": {"value_type": "doubleValue", "value": "20.5"},
},
}
expected = {
"filter": {"fieldName": "some_field", "betweenFilter": {"fromValue": {"doubleValue": 10.5}, "toValue": {"doubleValue": 20.5}}}
}
result = transform_expression(expression)
assert result == expected
class TestGetSourceDefinedPrimaryKey:
@pytest.mark.parametrize(
"stream_name, mocked_content, expected",
[
("sample_stream", {"streams": [{"stream": {"name": "sample_stream", "source_defined_primary_key": ["id"]}}]}, ["id"]),
("sample_stream", {"streams": [{"stream": {"name": "different_stream", "source_defined_primary_key": ["id"]}}]}, None),
],
)
def test_primary_key(self, stream_name, mocked_content, expected):
sys.argv = ["script_name", "read", "--catalog", "mocked_catalog_path"]
m = mock_open(read_data=json.dumps(mocked_content))
with patch("builtins.open", m):
with patch("json.loads", return_value=mocked_content):
result = get_source_defined_primary_key(stream_name)
assert result == expected
class TestTransformJson:
@staticmethod
def mock_transform_expression(expression):
return {"transformed": expression}
# Applying pytest monkeypatch for the mock_transform_expression
@pytest.fixture(autouse=True)
def mock_transform_functions(self, monkeypatch):
monkeypatch.setattr("source_google_analytics_data_api.utils.transform_expression", self.mock_transform_expression)
@pytest.mark.parametrize(
"original, expected",
[
(
{
"filter_type": "andGroup",
"expressions": [{"field": "field1", "condition": "cond1"}, {"field": "field2", "condition": "cond2"}],
},
{
"andGroup": {
"expressions": [
{"transformed": {"field": "field1", "condition": "cond1"}},
{"transformed": {"field": "field2", "condition": "cond2"}},
]
}
},
),
(
{"filter_type": "orGroup", "expressions": [{"field": "field1", "condition": "cond1"}]},
{"orGroup": {"expressions": [{"transformed": {"field": "field1", "condition": "cond1"}}]}},
),
(
{"filter_type": "notExpression", "expression": {"field": "field1", "condition": "cond1"}},
{"notExpression": {"transformed": {"field": "field1", "condition": "cond1"}}},
),
(
{"filter_type": "filter", "field": "field1", "condition": "cond1"},
{"transformed": {"condition": "cond1", "field": "field1", "filter_type": "filter"}},
),
({"filter_type": "andGroup"}, {}),
],
)
def test_cases(self, original, expected):
result = transform_json(original)
assert result == expected

View File

@@ -1,17 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, MutableMapping
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
for _slice in slices:
records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=_slice, stream_state=stream_state)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
yield record