Source Google Ads: add custom full_refresh stream (#22703)
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
This commit is contained in:
@@ -686,11 +686,15 @@
|
||||
- name: Google Ads
|
||||
sourceDefinitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
|
||||
dockerRepository: airbyte/source-google-ads
|
||||
dockerImageTag: 0.2.9
|
||||
dockerImageTag: 0.2.10
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
|
||||
icon: google-adwords.svg
|
||||
sourceType: api
|
||||
releaseStage: generally_available
|
||||
allowedHosts:
|
||||
hosts:
|
||||
- accounts.google.com
|
||||
- googleads.googleapis.com
|
||||
- name: Google Analytics (Universal Analytics)
|
||||
sourceDefinitionId: eff3616a-f9c3-11eb-9a03-0242ac130003
|
||||
dockerRepository: airbyte/source-google-analytics-v4
|
||||
|
||||
@@ -5330,7 +5330,7 @@
|
||||
supportsNormalization: false
|
||||
supportsDBT: false
|
||||
supported_destination_sync_modes: []
|
||||
- dockerImage: "airbyte/source-google-ads:0.2.9"
|
||||
- dockerImage: "airbyte/source-google-ads:0.2.10"
|
||||
spec:
|
||||
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-ads"
|
||||
connectionSpecification:
|
||||
|
||||
@@ -13,5 +13,5 @@ COPY main.py ./
|
||||
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.2.9
|
||||
LABEL io.airbyte.version=0.2.10
|
||||
LABEL io.airbyte.name=airbyte/source-google-ads
|
||||
|
||||
@@ -19,7 +19,7 @@ acceptance_tests:
|
||||
tests:
|
||||
- config_path: "secrets/config.json"
|
||||
expect_records:
|
||||
path: "integration_tests/expected_records.txt"
|
||||
path: "integration_tests/expected_records.jsonl"
|
||||
timeout_seconds: 600
|
||||
empty_streams:
|
||||
- name: "accounts"
|
||||
|
||||
@@ -218,23 +218,29 @@
|
||||
"stream": {
|
||||
"name": "happytable",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["segments.date"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "overwrite",
|
||||
"cursor_field": ["segments.date"]
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "unhappytable",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["segments.date"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "overwrite",
|
||||
"cursor_field": ["segments.date"]
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "ad_group_custom",
|
||||
"name": "custom_audience",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -2,17 +2,16 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import re
|
||||
from functools import lru_cache
|
||||
from typing import Any, Dict, List, Mapping
|
||||
from typing import Any, Dict, Mapping
|
||||
|
||||
from .streams import IncrementalGoogleAdsStream
|
||||
from .streams import GoogleAdsStream, IncrementalGoogleAdsStream
|
||||
from .utils import GAQL
|
||||
|
||||
|
||||
class CustomQuery(IncrementalGoogleAdsStream):
|
||||
def __init__(self, custom_query_config, **kwargs):
|
||||
self.custom_query_config = custom_query_config
|
||||
self.user_defined_query = custom_query_config["query"]
|
||||
class CustomQueryMixin:
|
||||
def __init__(self, config, **kwargs):
|
||||
self.config = config
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@property
|
||||
@@ -22,16 +21,12 @@ class CustomQuery(IncrementalGoogleAdsStream):
|
||||
It will be ignored if provided.
|
||||
If you need to enable it, uncomment the next line instead of `return None` and modify your config
|
||||
"""
|
||||
# return self.custom_query_config.get("primary_key") or None
|
||||
# return self.config.get("primary_key") or None
|
||||
return None
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.custom_query_config["table_name"]
|
||||
|
||||
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
|
||||
start_date, end_date = stream_slice.get("start_date"), stream_slice.get("end_date")
|
||||
return self.insert_segments_date_expr(self.user_defined_query, start_date, end_date)
|
||||
return self.config["table_name"]
|
||||
|
||||
# IncrementalGoogleAdsStream uses get_json_schema a lot while parsing
|
||||
# responses, caching playing crucial role for performance here.
|
||||
@@ -58,14 +53,15 @@ class CustomQuery(IncrementalGoogleAdsStream):
|
||||
"BOOLEAN": "boolean",
|
||||
"DATE": "string",
|
||||
}
|
||||
fields = CustomQuery.get_query_fields(self.user_defined_query)
|
||||
fields.append(self.cursor_field)
|
||||
fields = list(self.config["query"].fields)
|
||||
if self.cursor_field:
|
||||
fields.append(self.cursor_field)
|
||||
google_schema = self.google_ads_client.get_fields_metadata(fields)
|
||||
|
||||
for field in fields:
|
||||
node = google_schema.get(field)
|
||||
# Data type return in enum format: "GoogleAdsFieldDataType.<data_type>"
|
||||
google_data_type = str(node.data_type).replace("GoogleAdsFieldDataType.", "")
|
||||
google_data_type = node.data_type.name
|
||||
if google_data_type == "ENUM":
|
||||
field_value = {"type": "string", "enum": list(node.enum_values)}
|
||||
elif google_data_type == "MESSAGE":
|
||||
@@ -88,26 +84,15 @@ class CustomQuery(IncrementalGoogleAdsStream):
|
||||
|
||||
return local_json_schema
|
||||
|
||||
# Regexp flags for parsing GAQL query
|
||||
RE_FLAGS = re.DOTALL | re.MULTILINE | re.IGNORECASE
|
||||
# Regexp for getting query columns
|
||||
SELECT_EXPR = re.compile("select(.*)from", flags=RE_FLAGS)
|
||||
WHERE_EXPR = re.compile("where.*", flags=RE_FLAGS)
|
||||
# list of keywords that can come after WHERE clause,
|
||||
# according to https://developers.google.com/google-ads/api/docs/query/grammar
|
||||
# each whitespace matters!
|
||||
KEYWORDS_EXPR = re.compile("(order by| limit | parameters )", flags=RE_FLAGS)
|
||||
|
||||
class IncrementalCustomQuery(CustomQueryMixin, IncrementalGoogleAdsStream):
|
||||
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
|
||||
start_date, end_date = stream_slice["start_date"], stream_slice["end_date"]
|
||||
query = self.insert_segments_date_expr(self.config["query"], start_date, end_date)
|
||||
return str(query)
|
||||
|
||||
@staticmethod
|
||||
def get_query_fields(query: str) -> List[str]:
|
||||
fields = CustomQuery.SELECT_EXPR.search(query)
|
||||
if not fields:
|
||||
return []
|
||||
fields = fields.group(1)
|
||||
return [f.strip() for f in fields.split(",")]
|
||||
|
||||
@staticmethod
|
||||
def insert_segments_date_expr(query: str, start_date: str, end_date: str) -> str:
|
||||
def insert_segments_date_expr(query: GAQL, start_date: str, end_date: str) -> GAQL:
|
||||
"""
|
||||
Insert segments.date condition to break query into slices for incremental stream.
|
||||
:param query Origin user defined query
|
||||
@@ -115,35 +100,14 @@ class CustomQuery(IncrementalGoogleAdsStream):
|
||||
:param end_date end date for metric (inclusive)
|
||||
:return Modified query with date window condition included
|
||||
"""
|
||||
# insert segments.date field
|
||||
columns = CustomQuery.SELECT_EXPR.search(query)
|
||||
if not columns:
|
||||
raise Exception("Not valid GAQL expression")
|
||||
columns = columns.group(1)
|
||||
new_columns = columns + ", segments.date\n"
|
||||
result_query = query.replace(columns, new_columns)
|
||||
if "segments.date" not in query.fields:
|
||||
query = query.append_field("segments.date")
|
||||
condition = f"segments.date BETWEEN '{start_date}' AND '{end_date}'"
|
||||
if query.where:
|
||||
return query.set_where(query.where + " AND " + condition)
|
||||
return query.set_where(condition)
|
||||
|
||||
# Modify/insert where condition
|
||||
where_cond = CustomQuery.WHERE_EXPR.search(result_query)
|
||||
if not where_cond:
|
||||
# There is no where condition, insert new one
|
||||
where_location = len(result_query)
|
||||
keywords = CustomQuery.KEYWORDS_EXPR.search(result_query)
|
||||
if keywords:
|
||||
# where condition is not at the end of expression, insert new condition before keyword begins.
|
||||
where_location = keywords.start()
|
||||
result_query = (
|
||||
result_query[0:where_location]
|
||||
+ f"\nWHERE segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
|
||||
+ result_query[where_location:]
|
||||
)
|
||||
return result_query
|
||||
# There is already where condition, add segments.date expression
|
||||
where_cond = where_cond.group(0)
|
||||
keywords = CustomQuery.KEYWORDS_EXPR.search(where_cond)
|
||||
if keywords:
|
||||
# There is some keywords after WHERE condition
|
||||
where_cond = where_cond[0 : keywords.start()]
|
||||
new_where_cond = where_cond + f" AND segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
|
||||
result_query = result_query.replace(where_cond, new_where_cond)
|
||||
return result_query
|
||||
|
||||
class CustomQuery(CustomQueryMixin, GoogleAdsStream):
|
||||
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
|
||||
return str(self.config["query"])
|
||||
|
||||
@@ -13,7 +13,7 @@ from airbyte_cdk.sources.streams import Stream
|
||||
from google.ads.googleads.errors import GoogleAdsException
|
||||
from pendulum import parse, today
|
||||
|
||||
from .custom_query_stream import CustomQuery
|
||||
from .custom_query_stream import CustomQuery, IncrementalCustomQuery
|
||||
from .google_ads import GoogleAds
|
||||
from .models import Customer
|
||||
from .streams import (
|
||||
@@ -35,6 +35,9 @@ from .streams import (
|
||||
ShoppingPerformanceReport,
|
||||
UserLocationReport,
|
||||
)
|
||||
from .utils import GAQL
|
||||
|
||||
FULL_REFRESH_CUSTOM_TABLE = ["geo_target_constant", "custom_audience"]
|
||||
|
||||
|
||||
class SourceGoogleAds(AbstractSource):
|
||||
@@ -42,6 +45,8 @@ class SourceGoogleAds(AbstractSource):
|
||||
def _validate_and_transform(config: Mapping[str, Any]):
|
||||
if config.get("end_date") == "":
|
||||
config.pop("end_date")
|
||||
for query in config.get("custom_queries", []):
|
||||
query["query"] = GAQL.parse(query["query"])
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
@@ -77,10 +82,9 @@ class SourceGoogleAds(AbstractSource):
|
||||
yield accounts_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_)
|
||||
|
||||
@staticmethod
|
||||
def is_metrics_in_custom_query(query: str) -> bool:
|
||||
fields = CustomQuery.get_query_fields(query)
|
||||
for field in fields:
|
||||
if field.startswith("metrics"):
|
||||
def is_metrics_in_custom_query(query: GAQL) -> bool:
|
||||
for field in query.fields:
|
||||
if field.split(".")[0] == "metrics":
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -95,16 +99,18 @@ class SourceGoogleAds(AbstractSource):
|
||||
# Check custom query request validity by sending metric request with non-existant time window
|
||||
for customer in customers:
|
||||
for query in config.get("custom_queries", []):
|
||||
query = query.get("query")
|
||||
query = query["query"]
|
||||
if customer.is_manager_account and self.is_metrics_in_custom_query(query):
|
||||
logger.warning(
|
||||
f"Metrics are not available for manager account {customer.id}. "
|
||||
f"Please remove metrics fields in your custom query: {query}."
|
||||
)
|
||||
if CustomQuery.cursor_field in query:
|
||||
return False, f"Custom query should not contain {CustomQuery.cursor_field}"
|
||||
req_q = CustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")
|
||||
response = google_api.send_request(req_q, customer_id=customer.id)
|
||||
if query.resource_name not in FULL_REFRESH_CUSTOM_TABLE:
|
||||
if IncrementalCustomQuery.cursor_field in query.fields:
|
||||
return False, f"Custom query should not contain {IncrementalCustomQuery.cursor_field}"
|
||||
query = IncrementalCustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")
|
||||
query = query.set_limit(1)
|
||||
response = google_api.send_request(str(query), customer_id=customer.id)
|
||||
# iterate over the response otherwise exceptions will not be raised!
|
||||
for _ in response:
|
||||
pass
|
||||
@@ -147,10 +153,16 @@ class SourceGoogleAds(AbstractSource):
|
||||
]
|
||||
)
|
||||
for single_query_config in config.get("custom_queries", []):
|
||||
query = single_query_config.get("query")
|
||||
query = single_query_config["query"]
|
||||
if self.is_metrics_in_custom_query(query):
|
||||
if non_manager_accounts:
|
||||
streams.append(CustomQuery(custom_query_config=single_query_config, **non_manager_incremental_config))
|
||||
if query.resource_name in FULL_REFRESH_CUSTOM_TABLE:
|
||||
streams.append(CustomQuery(config=single_query_config, api=google_api, customers=non_manager_accounts))
|
||||
else:
|
||||
streams.append(IncrementalCustomQuery(config=single_query_config, **non_manager_incremental_config))
|
||||
continue
|
||||
streams.append(CustomQuery(custom_query_config=single_query_config, **incremental_config))
|
||||
if query.resource_name in FULL_REFRESH_CUSTOM_TABLE:
|
||||
streams.append(CustomQuery(config=single_query_config, api=google_api, customers=customers))
|
||||
else:
|
||||
streams.append(IncrementalCustomQuery(config=single_query_config, **incremental_config))
|
||||
return streams
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
@dataclass(repr=False, eq=False, frozen=True)
|
||||
class GAQL:
|
||||
"""
|
||||
Simple regex parser of Google Ads Query Language
|
||||
https://developers.google.com/google-ads/api/docs/query/grammar
|
||||
"""
|
||||
|
||||
fields: Tuple[str]
|
||||
resource_name: str
|
||||
where: str
|
||||
order_by: str
|
||||
limit: Optional[int]
|
||||
parameters: str
|
||||
|
||||
REGEX = re.compile(
|
||||
r"""\s*
|
||||
SELECT\s+(?P<FieldNames>\S.*)
|
||||
\s+
|
||||
FROM\s+(?P<ResourceName>[a-z]([a-zA-Z_])*)
|
||||
\s*
|
||||
(\s+WHERE\s+(?P<WhereClause>\S.*?))?
|
||||
(\s+ORDER\s+BY\s+(?P<OrderByClause>\S.*?))?
|
||||
(\s+LIMIT\s+(?P<LimitClause>[1-9]([0-9])*))?
|
||||
\s*
|
||||
(\s+PARAMETERS\s+(?P<ParametersClause>\S.*?))?
|
||||
$""",
|
||||
flags=re.I | re.DOTALL | re.VERBOSE,
|
||||
)
|
||||
|
||||
REGEX_FIELD_NAME = re.compile(r"^[a-z][a-z0-9._]*$", re.I)
|
||||
|
||||
@classmethod
|
||||
def parse(cls, query):
|
||||
m = cls.REGEX.match(query)
|
||||
if not m:
|
||||
raise Exception(f"incorrect GAQL query statement: {repr(query)}")
|
||||
|
||||
fields = [f.strip() for f in m.group("FieldNames").split(",")]
|
||||
for field in fields:
|
||||
if not cls.REGEX_FIELD_NAME.match(field):
|
||||
raise Exception(f"incorrect GAQL query statement: {repr(query)}")
|
||||
|
||||
resource_name = m.group("ResourceName")
|
||||
where = cls._normalize(m.group("WhereClause") or "")
|
||||
order_by = cls._normalize(m.group("OrderByClause") or "")
|
||||
limit = m.group("LimitClause")
|
||||
if limit:
|
||||
limit = int(limit)
|
||||
parameters = cls._normalize(m.group("ParametersClause") or "")
|
||||
return cls(tuple(fields), resource_name, where, order_by, limit, parameters)
|
||||
|
||||
def __str__(self):
|
||||
fields = ", ".join(self.fields)
|
||||
query = f"SELECT {fields} FROM {self.resource_name}"
|
||||
if self.where:
|
||||
query += " WHERE " + self.where
|
||||
if self.order_by:
|
||||
query += " ORDER BY " + self.order_by
|
||||
if self.limit is not None:
|
||||
query += " LIMIT " + str(self.limit)
|
||||
if self.parameters:
|
||||
query += " PARAMETERS " + self.parameters
|
||||
return query
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
@staticmethod
|
||||
def _normalize(s):
|
||||
s = s.strip()
|
||||
return re.sub(r"\s+", " ", s)
|
||||
|
||||
def set_where(self, value: str):
|
||||
return self.__class__(self.fields, self.resource_name, value, self.order_by, self.limit, self.parameters)
|
||||
|
||||
def set_limit(self, value: int):
|
||||
return self.__class__(self.fields, self.resource_name, self.where, self.order_by, value, self.parameters)
|
||||
|
||||
def append_field(self, value):
|
||||
fields = list(self.fields)
|
||||
fields.append(value)
|
||||
return self.__class__(tuple(fields), self.resource_name, self.where, self.order_by, self.limit, self.parameters)
|
||||
@@ -6,7 +6,7 @@ import pytest
|
||||
from source_google_ads.models import Customer
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", name="config")
|
||||
@pytest.fixture(name="config")
|
||||
def test_config():
|
||||
config = {
|
||||
"credentials": {
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from source_google_ads.custom_query_stream import CustomQuery
|
||||
from source_google_ads.custom_query_stream import IncrementalCustomQuery
|
||||
from source_google_ads.utils import GAQL
|
||||
|
||||
|
||||
def test_custom_query():
|
||||
@@ -11,15 +12,8 @@ def test_custom_query():
|
||||
ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status,
|
||||
campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id,
|
||||
campaign.app_campaign_setting.app_store FROM search_term_view"""
|
||||
output_q = CustomQuery.insert_segments_date_expr(input_q, "1980-01-01", "1980-01-01")
|
||||
output_q = IncrementalCustomQuery.insert_segments_date_expr(GAQL.parse(input_q), "1980-01-01", "1980-01-01")
|
||||
assert (
|
||||
output_q
|
||||
== """SELECT ad_group.resource_name, ad_group.status, ad_group.target_cpa_micros, ad_group.target_cpm_micros,
|
||||
ad_group.target_roas, ad_group.targeting_setting.target_restrictions, ad_group.tracking_url_template, ad_group.type,
|
||||
ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status,
|
||||
campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id,
|
||||
campaign.app_campaign_setting.app_store , segments.date
|
||||
FROM search_term_view
|
||||
WHERE segments.date BETWEEN '1980-01-01' AND '1980-01-01'
|
||||
"""
|
||||
str(output_q)
|
||||
== """SELECT ad_group.resource_name, ad_group.status, ad_group.target_cpa_micros, ad_group.target_cpm_micros, ad_group.target_roas, ad_group.targeting_setting.target_restrictions, ad_group.tracking_url_template, ad_group.type, ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status, campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id, campaign.app_campaign_setting.app_store, segments.date FROM search_term_view WHERE segments.date BETWEEN '1980-01-01' AND '1980-01-01'"""
|
||||
)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import re
|
||||
from collections import namedtuple
|
||||
from unittest.mock import Mock
|
||||
|
||||
@@ -11,10 +12,11 @@ from freezegun import freeze_time
|
||||
from google.ads.googleads.errors import GoogleAdsException
|
||||
from google.ads.googleads.v11.errors.types.authorization_error import AuthorizationErrorEnum
|
||||
from pendulum import today
|
||||
from source_google_ads.custom_query_stream import CustomQuery
|
||||
from source_google_ads.custom_query_stream import IncrementalCustomQuery
|
||||
from source_google_ads.google_ads import GoogleAds
|
||||
from source_google_ads.source import SourceGoogleAds
|
||||
from source_google_ads.streams import AdGroupAdReport, AdGroupLabels, ServiceAccounts, chunk_date_range
|
||||
from source_google_ads.utils import GAQL
|
||||
|
||||
from .common import MockErroringGoogleAdsClient, MockGoogleAdsClient, make_google_ads_exception
|
||||
|
||||
@@ -59,11 +61,12 @@ def mocked_gads_api(mocker):
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_fields_meta_data():
|
||||
DataType = namedtuple("DataType", ["name"])
|
||||
Node = namedtuple("Node", ["data_type", "name", "enum_values", "is_repeated"])
|
||||
nodes = (
|
||||
Node("RESOURCE_NAME", "campaign.accessible_bidding_strategy", [], False),
|
||||
Node(DataType("RESOURCE_NAME"), "campaign.accessible_bidding_strategy", [], False),
|
||||
Node(
|
||||
"ENUM",
|
||||
DataType("ENUM"),
|
||||
"segments.ad_destination_type",
|
||||
[
|
||||
"APP_DEEP_LINK",
|
||||
@@ -82,22 +85,22 @@ def mock_fields_meta_data():
|
||||
],
|
||||
False,
|
||||
),
|
||||
Node("DATE", "campaign.start_date", [], is_repeated=False),
|
||||
Node("DATE", "campaign.end_date", [], False),
|
||||
Node("DATE", "segments.date", [], False),
|
||||
Node(DataType("DATE"), "campaign.start_date", [], is_repeated=False),
|
||||
Node(DataType("DATE"), "campaign.end_date", [], False),
|
||||
Node(DataType("DATE"), "segments.date", [], False),
|
||||
Node(
|
||||
"ENUM",
|
||||
DataType("ENUM"),
|
||||
"accessible_bidding_strategy.target_impression_share.location",
|
||||
["ABSOLUTE_TOP_OF_PAGE", "ANYWHERE_ON_PAGE", "TOP_OF_PAGE", "UNKNOWN", "UNSPECIFIED"],
|
||||
False,
|
||||
),
|
||||
Node("STRING", "campaign.name", [], False),
|
||||
Node("DOUBLE", "campaign.optimization_score", [], False),
|
||||
Node("RESOURCE_NAME", "campaign.resource_name", [], False),
|
||||
Node("INT32", "campaign.shopping_setting.campaign_priority", [], False),
|
||||
Node("INT64", "campaign.shopping_setting.merchant_id", [], False),
|
||||
Node("BOOLEAN", "campaign_budget.explicitly_shared", [], False),
|
||||
Node("MESSAGE", "bidding_strategy.enhanced_cpc", [], False),
|
||||
Node(DataType("STRING"), "campaign.name", [], False),
|
||||
Node(DataType("DOUBLE"), "campaign.optimization_score", [], False),
|
||||
Node(DataType("RESOURCE_NAME"), "campaign.resource_name", [], False),
|
||||
Node(DataType("INT32"), "campaign.shopping_setting.campaign_priority", [], False),
|
||||
Node(DataType("INT64"), "campaign.shopping_setting.merchant_id", [], False),
|
||||
Node(DataType("BOOLEAN"), "campaign_budget.explicitly_shared", [], False),
|
||||
Node(DataType("MESSAGE"), "bidding_strategy.enhanced_cpc", [], False),
|
||||
)
|
||||
return Mock(get_fields_metadata=Mock(return_value={node.name: node for node in nodes}))
|
||||
|
||||
@@ -159,7 +162,7 @@ def test_streams_count(config, mock_account_info):
|
||||
)
|
||||
def test_metrics_in_custom_query(query, is_metrics_in_query):
|
||||
source = SourceGoogleAds()
|
||||
assert source.is_metrics_in_custom_query(query) is is_metrics_in_query
|
||||
assert source.is_metrics_in_custom_query(GAQL.parse(query)) is is_metrics_in_query
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -181,51 +184,16 @@ def test_updated_state(stream_mock, latest_record, current_state, expected_state
|
||||
def stream_instance(query, api_mock, **kwargs):
|
||||
start_date = "2021-03-04"
|
||||
conversion_window_days = 14
|
||||
instance = CustomQuery(
|
||||
instance = IncrementalCustomQuery(
|
||||
api=api_mock,
|
||||
conversion_window_days=conversion_window_days,
|
||||
start_date=start_date,
|
||||
custom_query_config={"query": query, "table_name": "whatever_table"},
|
||||
config={"query": GAQL.parse(query), "table_name": "whatever_table"},
|
||||
**kwargs,
|
||||
)
|
||||
return instance
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query, fields",
|
||||
[
|
||||
(
|
||||
"""
|
||||
SELECT
|
||||
campaign.id,
|
||||
campaign.name,
|
||||
campaign.status,
|
||||
metrics.impressions
|
||||
FROM campaign
|
||||
WHERE campaign.status = 'PAUSED'
|
||||
AND metrics.impressions > 100
|
||||
ORDER BY campaign.status
|
||||
""",
|
||||
["campaign.id", "campaign.name", "campaign.status", "metrics.impressions"],
|
||||
),
|
||||
(
|
||||
"""
|
||||
SELECT
|
||||
campaign.accessible_bidding_strategy,
|
||||
segments.ad_destination_type,
|
||||
campaign.start_date,
|
||||
campaign.end_date
|
||||
FROM campaign
|
||||
""",
|
||||
["campaign.accessible_bidding_strategy", "segments.ad_destination_type", "campaign.start_date", "campaign.end_date"],
|
||||
),
|
||||
("""selet aasdasd from aaa""", []),
|
||||
],
|
||||
)
|
||||
def test_get_query_fields(query, fields):
|
||||
assert CustomQuery.get_query_fields(query) == fields
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"original_query, expected_query",
|
||||
[
|
||||
@@ -246,8 +214,8 @@ SELECT
|
||||
campaign.id,
|
||||
campaign.name,
|
||||
campaign.status,
|
||||
metrics.impressions
|
||||
, segments.date
|
||||
metrics.impressions,
|
||||
segments.date
|
||||
FROM campaign
|
||||
WHERE campaign.status = 'PAUSED'
|
||||
AND metrics.impressions > 100
|
||||
@@ -270,8 +238,8 @@ SELECT
|
||||
campaign.id,
|
||||
campaign.name,
|
||||
campaign.status,
|
||||
metrics.impressions
|
||||
, segments.date
|
||||
metrics.impressions,
|
||||
segments.date
|
||||
FROM campaign
|
||||
|
||||
WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
|
||||
@@ -294,8 +262,8 @@ SELECT
|
||||
campaign.id,
|
||||
campaign.name,
|
||||
campaign.status,
|
||||
metrics.impressions
|
||||
, segments.date
|
||||
metrics.impressions,
|
||||
segments.date
|
||||
FROM campaign
|
||||
WHERE campaign.status = 'PAUSED'
|
||||
AND metrics.impressions > 100
|
||||
@@ -316,8 +284,8 @@ SELECT
|
||||
campaign.accessible_bidding_strategy,
|
||||
segments.ad_destination_type,
|
||||
campaign.start_date,
|
||||
campaign.end_date
|
||||
, segments.date
|
||||
campaign.end_date,
|
||||
segments.date
|
||||
FROM campaign
|
||||
|
||||
WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
|
||||
@@ -326,7 +294,8 @@ WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
|
||||
],
|
||||
)
|
||||
def test_insert_date(original_query, expected_query):
|
||||
assert CustomQuery.insert_segments_date_expr(original_query, "1980-01-01", "2000-01-01") == expected_query
|
||||
expected_query = re.sub(r"\s+", " ", expected_query.strip())
|
||||
assert str(IncrementalCustomQuery.insert_segments_date_expr(GAQL.parse(original_query), "1980-01-01", "2000-01-01")) == expected_query
|
||||
|
||||
|
||||
def test_get_json_schema_parse_query(mock_fields_meta_data, customers):
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import pytest
|
||||
from source_google_ads.utils import GAQL
|
||||
|
||||
|
||||
def test_parse_GAQL_ok():
|
||||
sql = GAQL.parse("SELECT field FROM table")
|
||||
assert sql.fields == ("field",)
|
||||
assert sql.resource_name == "table"
|
||||
assert sql.where == ""
|
||||
assert sql.order_by == ""
|
||||
assert sql.limit is None
|
||||
assert sql.parameters == ""
|
||||
assert str(sql) == "SELECT field FROM table"
|
||||
|
||||
sql = GAQL.parse("SELECT field1, field2 FROM x_Table ")
|
||||
assert sql.fields == ("field1", "field2")
|
||||
assert sql.resource_name == "x_Table"
|
||||
assert sql.where == ""
|
||||
assert sql.order_by == ""
|
||||
assert sql.limit is None
|
||||
assert sql.parameters == ""
|
||||
assert str(sql) == "SELECT field1, field2 FROM x_Table"
|
||||
|
||||
sql = GAQL.parse("SELECT field1, field2 FROM x_Table WHERE date = '2020-01-01' ")
|
||||
assert sql.fields == ("field1", "field2")
|
||||
assert sql.resource_name == "x_Table"
|
||||
assert sql.where == "date = '2020-01-01'"
|
||||
assert sql.order_by == ""
|
||||
assert sql.limit is None
|
||||
assert sql.parameters == ""
|
||||
assert str(sql) == "SELECT field1, field2 FROM x_Table WHERE date = '2020-01-01'"
|
||||
|
||||
sql = GAQL.parse("SELECT field1, field2 FROM x_Table WHERE date = '2020-01-01' ORDER BY field2, field1 ")
|
||||
assert sql.fields == ("field1", "field2")
|
||||
assert sql.resource_name == "x_Table"
|
||||
assert sql.where == "date = '2020-01-01'"
|
||||
assert sql.order_by == "field2, field1"
|
||||
assert sql.limit is None
|
||||
assert sql.parameters == ""
|
||||
assert str(sql) == "SELECT field1, field2 FROM x_Table WHERE date = '2020-01-01' ORDER BY field2, field1"
|
||||
|
||||
sql = GAQL.parse("SELECT t.field1, t.field2 FROM x_Table ORDER BY field2, field1 LIMIT 10 ")
|
||||
assert sql.fields == ("t.field1", "t.field2")
|
||||
assert sql.resource_name == "x_Table"
|
||||
assert sql.where == ""
|
||||
assert sql.order_by == "field2, field1"
|
||||
assert sql.limit == 10
|
||||
assert sql.parameters == ""
|
||||
assert str(sql) == "SELECT t.field1, t.field2 FROM x_Table ORDER BY field2, field1 LIMIT 10"
|
||||
|
||||
sql = GAQL.parse("""
|
||||
SELECT field1, field2
|
||||
FROM x_Table
|
||||
WHERE date = '2020-01-01'
|
||||
ORDER BY field2 ASC, field1 DESC
|
||||
LIMIT 10
|
||||
PARAMETERS include_drafts=true """)
|
||||
|
||||
assert sql.fields == ("field1", "field2")
|
||||
assert sql.resource_name == "x_Table"
|
||||
assert sql.where == "date = '2020-01-01'"
|
||||
assert sql.order_by == "field2 ASC, field1 DESC"
|
||||
assert sql.limit == 10
|
||||
assert sql.parameters == "include_drafts=true"
|
||||
assert str(sql) == "SELECT field1, field2 FROM x_Table WHERE date = '2020-01-01' ORDER BY field2 ASC, field1 DESC LIMIT 10 PARAMETERS include_drafts=true"
|
||||
|
||||
|
||||
def test_parse_GAQL_fail():
|
||||
with pytest.raises(Exception) as e:
|
||||
GAQL.parse("SELECT field1, field2 FROM x_Table2")
|
||||
assert str(e.value) == "incorrect GAQL query statement: 'SELECT field1, field2 FROM x_Table2'"
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
GAQL.parse("SELECT field1, field2 FROM x_Table WHERE ")
|
||||
with pytest.raises(Exception) as e:
|
||||
GAQL.parse("SELECT field1, , field2 FROM table")
|
||||
with pytest.raises(Exception) as e:
|
||||
GAQL.parse("SELECT fie ld1, field2 FROM table")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query, fields",
|
||||
[
|
||||
(
|
||||
"""
|
||||
SELECT
|
||||
campaign.id,
|
||||
campaign.name,
|
||||
campaign.status,
|
||||
metrics.impressions
|
||||
FROM campaign
|
||||
WHERE campaign.status = 'PAUSED'
|
||||
AND metrics.impressions > 100
|
||||
ORDER BY campaign.status
|
||||
""",
|
||||
["campaign.id", "campaign.name", "campaign.status", "metrics.impressions"],
|
||||
),
|
||||
(
|
||||
"""
|
||||
SELECT
|
||||
campaign.accessible_bidding_strategy,
|
||||
segments.ad_destination_type,
|
||||
campaign.start_date,
|
||||
campaign.end_date
|
||||
FROM campaign
|
||||
""",
|
||||
["campaign.accessible_bidding_strategy", "segments.ad_destination_type", "campaign.start_date", "campaign.end_date"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_query_fields(query, fields):
|
||||
assert list(GAQL.parse(query).fields) == fields
|
||||
@@ -86,7 +86,7 @@
|
||||
| **Glassfrog** | <img alt="Glassfrog icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/glassfrog.svg" height="30" height="30"/> | Source | airbyte/source-glassfrog:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/glassfrog) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-glassfrog) | <small>`cf8ff320-6272-4faa-89e6-4402dc17e5d5`</small> |
|
||||
| **GoCardless** | <img alt="GoCardless icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/gocardless.svg" height="30" height="30"/> | Source | airbyte/source-gocardless:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/gocardless) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-gocardless) | <small>`ba15ac82-5c6a-4fb2-bf24-925c23a1180c`</small> |
|
||||
| **Gong** | <img alt="Gong icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/gong.svg" height="30" height="30"/> | Source | airbyte/source-gong:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/gong) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-gong) | <small>`32382e40-3b49-4b99-9c5c-4076501914e7`</small> |
|
||||
| **Google Ads** | <img alt="Google Ads icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/google-adwords.svg" height="30" height="30"/> | Source | airbyte/source-google-ads:0.2.9 | generally_available | [link](https://docs.airbyte.com/integrations/sources/google-ads) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-ads) | <small>`253487c0-2246-43ba-a21f-5116b20a2c50`</small> |
|
||||
| **Google Ads** | <img alt="Google Ads icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/google-adwords.svg" height="30" height="30"/> | Source | airbyte/source-google-ads:0.2.10 | generally_available | [link](https://docs.airbyte.com/integrations/sources/google-ads) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-ads) | <small>`253487c0-2246-43ba-a21f-5116b20a2c50`</small> |
|
||||
| **Google Analytics (Universal Analytics)** | <img alt="Google Analytics (Universal Analytics) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/google-analytics.svg" height="30" height="30"/> | Source | airbyte/source-google-analytics-v4:0.1.34 | generally_available | [link](https://docs.airbyte.com/integrations/sources/google-analytics-universal-analytics) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-analytics-v4) | <small>`eff3616a-f9c3-11eb-9a03-0242ac130003`</small> |
|
||||
| **Google Analytics 4 (GA4)** | <img alt="Google Analytics 4 (GA4) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/google-analytics.svg" height="30" height="30"/> | Source | airbyte/source-google-analytics-data-api:0.1.1 | beta | [link](https://docs.airbyte.com/integrations/sources/google-analytics-v4) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-analytics-data-api) | <small>`3cc2eafd-84aa-4dca-93af-322d9dfeec1a`</small> |
|
||||
| **Google Directory** | <img alt="Google Directory icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/googledirectory.svg" height="30" height="30"/> | Source | airbyte/source-google-directory:0.1.9 | alpha | [link](https://docs.airbyte.com/integrations/sources/google-directory) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-directory) | <small>`d19ae824-e289-4b14-995a-0632eb46d246`</small> |
|
||||
|
||||
@@ -139,6 +139,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `0.2.10` | 2023-02-11 | [22703](https://github.com/airbytehq/airbyte/pull/22703) | Add support for custom full_refresh streams |
|
||||
| `0.2.9` | 2023-01-23 | [21705](https://github.com/airbytehq/airbyte/pull/21705) | Fix multibyte issue; Bump google-ads package to 19.0.0 |
|
||||
| `0.2.8` | 2023-01-18 | [21517](https://github.com/airbytehq/airbyte/pull/21517) | Write fewer logs |
|
||||
| `0.2.7` | 2023-01-10 | [20755](https://github.com/airbytehq/airbyte/pull/20755) | Add more logs to debug stuck syncs |
|
||||
|
||||
Reference in New Issue
Block a user