1
0
mirror of synced 2025-12-20 10:32:35 -05:00
Files
airbyte/airbyte-integrations/connectors/source-linkedin-ads/components.py

464 lines
18 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import json
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import urlencode
import requests
from requests.exceptions import InvalidURL
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
RequestInput,
)
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, ResponseAction
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from airbyte_cdk.sources.streams.http.http import BODY_REQUEST_METHODS
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_parse
# replace `pivot` with `_pivot`, to allow redshift normalization,
# since `pivot` is a reserved keyword for Destination Redshift,
# on behalf of https://github.com/airbytehq/airbyte/issues/13018,
# expand this list, if required.
DESTINATION_RESERVED_KEYWORDS: list = ["pivot"]
class SafeHttpClient(HttpClient):
"""
A custom HTTP client that safely validates query parameters, ensuring that the symbols ():,% are preserved
during UTF-8 encoding.
"""
def _create_prepared_request(
self,
http_method: str,
url: str,
dedupe_query_params: bool = False,
headers: Optional[Mapping[str, str]] = None,
params: Optional[Mapping[str, str]] = None,
json: Optional[Mapping[str, Any]] = None,
data: Optional[Union[str, Mapping[str, Any]]] = None,
) -> requests.PreparedRequest:
"""
Prepares an HTTP request with optional deduplication of query parameters and safe encoding.
"""
if dedupe_query_params:
query_params = self._dedupe_query_params(url, params)
else:
query_params = params or {}
query_params = urlencode(query_params, safe="():,%")
args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
if http_method.upper() in BODY_REQUEST_METHODS:
if json and data:
raise RequestBodyException(
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
)
elif json:
args["json"] = json
elif data:
args["data"] = data
prepared_request: requests.PreparedRequest = self._session.prepare_request(requests.Request(**args))
return prepared_request
@dataclass
class SafeEncodeHttpRequester(HttpRequester):
"""
A custom HTTP requester that ensures safe encoding of query parameters, preserving the symbols ():,% during UTF-8 encoding.
"""
request_body_json: Optional[RequestInput] = None
request_headers: Optional[RequestInput] = None
request_parameters: Optional[RequestInput] = None
request_body_data: Optional[RequestInput] = None
query_properties_key: Optional[str] = None
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
"""
Initializes the request options provider with the provided parameters and any
configured request components like headers, parameters, or bodies.
"""
self.request_options_provider = InterpolatedRequestOptionsProvider(
request_body_data=self.request_body_data,
request_body_json=self.request_body_json,
request_headers=self.request_headers,
request_parameters=self.request_parameters,
query_properties_key=self.query_properties_key,
config=self.config,
parameters=parameters or {},
)
super().__post_init__(parameters)
if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
backoff_strategies = self.error_handler.backoff_strategies
else:
backoff_strategies = None
self._http_client = SafeHttpClient(
name=self.name,
logger=self.logger,
error_handler=self.error_handler,
authenticator=self._authenticator,
use_cache=self.use_cache,
backoff_strategy=backoff_strategies,
disable_retries=self.disable_retries,
message_repository=self.message_repository,
)
@dataclass
class LinkedInAdsRecordExtractor(RecordExtractor):
"""
Extracts and transforms LinkedIn Ads records, ensuring that 'lastModified' and 'created'
date-time fields are formatted to RFC3339.
"""
def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Converts 'lastModified' and 'created' fields in the record to RFC3339 format.
"""
for item in ["lastModified", "created"]:
if record.get(item) is not None:
record[item] = ab_datetime_parse(record[item]).to_datetime().isoformat()
return record
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
"""
Extracts and transforms records from an HTTP response.
"""
for record in transform_data(response.json().get("elements")):
yield self._date_time_to_rfc3339(record)
@dataclass
class LinkedInAdsErrorHandler(DefaultErrorHandler):
"""
An error handler for LinkedIn Ads that interprets responses, providing custom error resolutions
for specific exceptions like `InvalidURL`.
This is a temporary workaround untill we update this in the CDK. https://github.com/airbytehq/airbyte-internal-issues/issues/11320
"""
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
"""
Interprets responses and exceptions, providing custom error resolutions for specific exceptions.
"""
if isinstance(response_or_exception, InvalidURL):
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="source-linkedin-ads has faced a temporary DNS resolution issue. Retrying...",
)
return super().interpret_response(response_or_exception)
def transform_change_audit_stamps(
record: Dict, dict_key: str = "changeAuditStamps", props: List = ["created", "lastModified"], fields: List = ["time"]
) -> Mapping[str, Any]:
"""
:: EXAMPLE `changeAuditStamps` input structure:
{
"changeAuditStamps": {
"created": {"time": 1629581275000},
"lastModified": {"time": 1629664544760}
}
}
:: EXAMPLE output:
{
"created": "2021-08-21 21:27:55",
"lastModified": "2021-08-22 20:35:44"
}
"""
target_dict: Dict = record.get(dict_key)
for prop in props:
# Update dict with flatten key:value
for field in fields:
record[prop] = ab_datetime_parse(target_dict.get(prop).get(field) // 1000).to_datetime().strftime("%Y-%m-%d %H:%M:%S")
record.pop(dict_key)
return record
def date_str_from_date_range(record: Dict, prefix: str) -> str:
"""
Makes the ISO8601 format date string from the input <prefix>.<part of the date>
EXAMPLE:
Input: record
{
"start.year": 2021, "start.month": 8, "start.day": 1,
"end.year": 2021, "end.month": 9, "end.day": 31
}
EXAMPLE output:
With `prefix` = "start"
str: "2021-08-13",
With `prefix` = "end"
str: "2021-09-31",
"""
year = record.get(f"{prefix}.year")
month = record.get(f"{prefix}.month")
day = record.get(f"{prefix}.day")
return AirbyteDateTime(year, month, day).to_datetime().strftime("%Y-%m-%d")
def transform_date_range(
record: Dict,
dict_key: str = "dateRange",
props: List = ["start", "end"],
fields: List = ["year", "month", "day"],
) -> Mapping[str, Any]:
"""
:: EXAMPLE `dateRange` input structure in Analytics streams:
{
"dateRange": {
"start": {"month": 8, "day": 13, "year": 2021},
"end": {"month": 8, "day": 13, "year": 2021}
}
}
:: EXAMPLE output:
{
"start_date": "2021-08-13",
"end_date": "2021-08-13"
}
"""
# define list of tmp keys for cleanup.
keys_to_remove = [dict_key, "start.day", "start.month", "start.year", "end.day", "end.month", "end.year", "start", "end"]
target_dict: Dict = record.get(dict_key)
for prop in props:
# Update dict with flatten key:value
for field in fields:
record.update(**{f"{prop}.{field}": target_dict.get(prop).get(field)})
# We build `start_date` & `end_date` fields from nested structure.
record.update(**{"start_date": date_str_from_date_range(record, "start"), "end_date": date_str_from_date_range(record, "end")})
# Cleanup tmp fields & nested used parts
for key in keys_to_remove:
if key in record:
record.pop(key)
return record
def transform_targeting_criteria(record: Dict, dict_key: str = "targetingCriteria") -> Mapping[str, Any]:
"""
:: EXAMPLE `targetingCriteria` input structure:
{
"targetingCriteria": {
"include": {
"and": [
{
"or": {
"urn:li:adTargetingFacet:titles": [
"urn:li:title:100",
"urn:li:title:10326",
"urn:li:title:10457",
"urn:li:title:10738",
"urn:li:title:10966",
"urn:li:title:11349",
"urn:li:title:1159",
]
}
},
{"or": {"urn:li:adTargetingFacet:locations": ["urn:li:geo:103644278"]}},
{"or": {"urn:li:adTargetingFacet:interfaceLocales": ["urn:li:locale:en_US"]}},
]
},
"exclude": {
"or": {
"urn:li:adTargetingFacet:facet_Key1": [
"facet_test1",
"facet_test2",
],
"urn:li:adTargetingFacet:facet_Key2": [
"facet_test3",
"facet_test4",
],
}
}
}
:: EXAMPLE output:
{
"targetingCriteria": {
"include": {
"and": [
{
"type": "urn:li:adTargetingFacet:titles",
"values": [
"urn:li:title:100",
"urn:li:title:10326",
"urn:li:title:10457",
"urn:li:title:10738",
"urn:li:title:10966",
"urn:li:title:11349",
"urn:li:title:1159",
],
},
{
"type": "urn:li:adTargetingFacet:locations",
"values": ["urn:li:geo:103644278"],
},
{
"type": "urn:li:adTargetingFacet:interfaceLocales",
"values": ["urn:li:locale:en_US"],
},
]
},
"exclude": {
"or": [
{
"type": "urn:li:adTargetingFacet:facet_Key1",
"values": ["facet_test1", "facet_test2"],
},
{
"type": "urn:li:adTargetingFacet:facet_Key2",
"values": ["facet_test3", "facet_test4"],
},
]
},
}
"""
def unnest_dict(nested_dict: Dict) -> Iterable[Dict]:
"""
Unnest the nested dict to simplify the normalization
EXAMPLE OUTPUT:
[
{"type": "some_key", "values": "some_values"},
...,
{"type": "some_other_key", "values": "some_other_values"}
]
"""
for key, value in nested_dict.items():
values = []
if isinstance(value, List):
if len(value) > 0:
if isinstance(value[0], str):
values = value
elif isinstance(value[0], Dict):
for v in value:
values.append(v)
elif isinstance(value, Dict):
values.append(value)
yield {"type": key, "values": values}
# get the target dict from record
targeting_criteria = record.get(dict_key)
# transform `include`
if "include" in targeting_criteria:
and_list = targeting_criteria.get("include").get("and")
updated_include = {"and": []}
for k in and_list:
or_dict = k.get("or")
for j in unnest_dict(or_dict):
updated_include["and"].append(j)
# Replace the original 'and' with updated_include
record["targetingCriteria"]["include"] = updated_include
# transform `exclude` if present
if "exclude" in targeting_criteria:
or_dict = targeting_criteria.get("exclude").get("or")
updated_exclude = {"or": []}
for k in unnest_dict(or_dict):
updated_exclude["or"].append(k)
# Replace the original 'or' with updated_exclude
record["targetingCriteria"]["exclude"] = updated_exclude
return record
def transform_variables(record: Dict, dict_key: str = "variables") -> Mapping[str, Any]:
"""
:: EXAMPLE `variables` input:
{
"variables": {
"data": {
"com.linkedin.ads.SponsoredUpdateCreativeVariables": {
"activity": "urn:li:activity:1234",
"directSponsoredContent": 0,
"share": "urn:li:share:1234",
}
}
}
}
:: EXAMPLE output:
{
"variables": {
"type": "com.linkedin.ads.SponsoredUpdateCreativeVariables",
"values": [
{"key": "activity", "value": "urn:li:activity:1234"},
{"key": "directSponsoredContent", "value": 0},
{"key": "share", "value": "urn:li:share:1234"},
],
}
}
"""
variables = record.get(dict_key).get("data")
for key, params in variables.items():
record["variables"]["type"] = key
record["variables"]["values"] = []
for key, value in params.items():
# convert various datatypes of values into the string
record["variables"]["values"].append({"key": key, "value": json.dumps(value, ensure_ascii=True)})
# Clean the nested structure
record["variables"].pop("data")
return record
def transform_col_names(record: Dict, dict_keys: list = []) -> Mapping[str, Any]:
"""
Rename records keys (columns) indicated in `dict_keys` to avoid normalization issues for certain destinations.
Example:
The `pivot` or `PIVOT` is the reserved keyword for DESTINATION REDSHIFT, we should avoid using it in this case.
https://github.com/airbytehq/airbyte/issues/13018
"""
for key in dict_keys:
if key in record:
record[f"_{key}"] = record[key] # create new key from original
record.pop(key) # remove the original key
return record
def transform_pivot_values(record: Dict) -> Mapping[str, Any]:
pivot_values = record.get("pivotValues", [])
record["string_of_pivot_values"] = ",".join(pivot_values)
return record
def transform_data(records: List) -> Iterable[Mapping]:
"""
We need to transform the nested complex data structures into simple key:value pair,
to be properly normalised in the destination.
"""
for record in records:
if "changeAuditStamps" in record:
record = transform_change_audit_stamps(record)
if "dateRange" in record:
record = transform_date_range(record)
if "targetingCriteria" in record:
record = transform_targeting_criteria(record)
if "variables" in record:
record = transform_variables(record)
if "pivotValues" in record:
record = transform_pivot_values(record)
record = transform_col_names(record, DESTINATION_RESERVED_KEYWORDS)
yield record