464 lines
18 KiB
Python
464 lines
18 KiB
Python
#
|
|
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
|
|
from urllib.parse import urlencode
|
|
|
|
import requests
|
|
from requests.exceptions import InvalidURL
|
|
|
|
from airbyte_cdk.models import FailureType
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
from airbyte_cdk.sources.declarative.requesters import HttpRequester
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
|
|
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
|
|
InterpolatedRequestOptionsProvider,
|
|
RequestInput,
|
|
)
|
|
from airbyte_cdk.sources.streams.http import HttpClient
|
|
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, ResponseAction
|
|
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
|
|
from airbyte_cdk.sources.streams.http.http import BODY_REQUEST_METHODS
|
|
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_parse
|
|
|
|
|
|
# replace `pivot` with `_pivot`, to allow redshift normalization,
|
|
# since `pivot` is a reserved keyword for Destination Redshift,
|
|
# on behalf of https://github.com/airbytehq/airbyte/issues/13018,
|
|
# expand this list, if required.
|
|
DESTINATION_RESERVED_KEYWORDS: list = ["pivot"]
|
|
|
|
|
|
class SafeHttpClient(HttpClient):
|
|
"""
|
|
A custom HTTP client that safely validates query parameters, ensuring that the symbols ():,% are preserved
|
|
during UTF-8 encoding.
|
|
"""
|
|
|
|
def _create_prepared_request(
|
|
self,
|
|
http_method: str,
|
|
url: str,
|
|
dedupe_query_params: bool = False,
|
|
headers: Optional[Mapping[str, str]] = None,
|
|
params: Optional[Mapping[str, str]] = None,
|
|
json: Optional[Mapping[str, Any]] = None,
|
|
data: Optional[Union[str, Mapping[str, Any]]] = None,
|
|
) -> requests.PreparedRequest:
|
|
"""
|
|
Prepares an HTTP request with optional deduplication of query parameters and safe encoding.
|
|
"""
|
|
if dedupe_query_params:
|
|
query_params = self._dedupe_query_params(url, params)
|
|
else:
|
|
query_params = params or {}
|
|
query_params = urlencode(query_params, safe="():,%")
|
|
args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
|
|
if http_method.upper() in BODY_REQUEST_METHODS:
|
|
if json and data:
|
|
raise RequestBodyException(
|
|
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
|
|
)
|
|
elif json:
|
|
args["json"] = json
|
|
elif data:
|
|
args["data"] = data
|
|
prepared_request: requests.PreparedRequest = self._session.prepare_request(requests.Request(**args))
|
|
|
|
return prepared_request
|
|
|
|
|
|
@dataclass
|
|
class SafeEncodeHttpRequester(HttpRequester):
|
|
"""
|
|
A custom HTTP requester that ensures safe encoding of query parameters, preserving the symbols ():,% during UTF-8 encoding.
|
|
"""
|
|
|
|
request_body_json: Optional[RequestInput] = None
|
|
request_headers: Optional[RequestInput] = None
|
|
request_parameters: Optional[RequestInput] = None
|
|
request_body_data: Optional[RequestInput] = None
|
|
query_properties_key: Optional[str] = None
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
|
|
"""
|
|
Initializes the request options provider with the provided parameters and any
|
|
configured request components like headers, parameters, or bodies.
|
|
"""
|
|
self.request_options_provider = InterpolatedRequestOptionsProvider(
|
|
request_body_data=self.request_body_data,
|
|
request_body_json=self.request_body_json,
|
|
request_headers=self.request_headers,
|
|
request_parameters=self.request_parameters,
|
|
query_properties_key=self.query_properties_key,
|
|
config=self.config,
|
|
parameters=parameters or {},
|
|
)
|
|
super().__post_init__(parameters)
|
|
|
|
if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
|
|
backoff_strategies = self.error_handler.backoff_strategies
|
|
else:
|
|
backoff_strategies = None
|
|
|
|
self._http_client = SafeHttpClient(
|
|
name=self.name,
|
|
logger=self.logger,
|
|
error_handler=self.error_handler,
|
|
authenticator=self._authenticator,
|
|
use_cache=self.use_cache,
|
|
backoff_strategy=backoff_strategies,
|
|
disable_retries=self.disable_retries,
|
|
message_repository=self.message_repository,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class LinkedInAdsRecordExtractor(RecordExtractor):
|
|
"""
|
|
Extracts and transforms LinkedIn Ads records, ensuring that 'lastModified' and 'created'
|
|
date-time fields are formatted to RFC3339.
|
|
"""
|
|
|
|
def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
|
|
"""
|
|
Converts 'lastModified' and 'created' fields in the record to RFC3339 format.
|
|
"""
|
|
for item in ["lastModified", "created"]:
|
|
if record.get(item) is not None:
|
|
record[item] = ab_datetime_parse(record[item]).to_datetime().isoformat()
|
|
return record
|
|
|
|
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
|
|
"""
|
|
Extracts and transforms records from an HTTP response.
|
|
"""
|
|
for record in transform_data(response.json().get("elements")):
|
|
yield self._date_time_to_rfc3339(record)
|
|
|
|
|
|
@dataclass
|
|
class LinkedInAdsErrorHandler(DefaultErrorHandler):
|
|
"""
|
|
An error handler for LinkedIn Ads that interprets responses, providing custom error resolutions
|
|
for specific exceptions like `InvalidURL`.
|
|
This is a temporary workaround untill we update this in the CDK. https://github.com/airbytehq/airbyte-internal-issues/issues/11320
|
|
"""
|
|
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
|
|
"""
|
|
Interprets responses and exceptions, providing custom error resolutions for specific exceptions.
|
|
"""
|
|
if isinstance(response_or_exception, InvalidURL):
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.transient_error,
|
|
error_message="source-linkedin-ads has faced a temporary DNS resolution issue. Retrying...",
|
|
)
|
|
return super().interpret_response(response_or_exception)
|
|
|
|
|
|
def transform_change_audit_stamps(
|
|
record: Dict, dict_key: str = "changeAuditStamps", props: List = ["created", "lastModified"], fields: List = ["time"]
|
|
) -> Mapping[str, Any]:
|
|
"""
|
|
:: EXAMPLE `changeAuditStamps` input structure:
|
|
{
|
|
"changeAuditStamps": {
|
|
"created": {"time": 1629581275000},
|
|
"lastModified": {"time": 1629664544760}
|
|
}
|
|
}
|
|
:: EXAMPLE output:
|
|
{
|
|
"created": "2021-08-21 21:27:55",
|
|
"lastModified": "2021-08-22 20:35:44"
|
|
}
|
|
"""
|
|
|
|
target_dict: Dict = record.get(dict_key)
|
|
for prop in props:
|
|
# Update dict with flatten key:value
|
|
for field in fields:
|
|
record[prop] = ab_datetime_parse(target_dict.get(prop).get(field) // 1000).to_datetime().strftime("%Y-%m-%d %H:%M:%S")
|
|
record.pop(dict_key)
|
|
|
|
return record
|
|
|
|
|
|
def date_str_from_date_range(record: Dict, prefix: str) -> str:
|
|
"""
|
|
Makes the ISO8601 format date string from the input <prefix>.<part of the date>
|
|
EXAMPLE:
|
|
Input: record
|
|
{
|
|
"start.year": 2021, "start.month": 8, "start.day": 1,
|
|
"end.year": 2021, "end.month": 9, "end.day": 31
|
|
}
|
|
EXAMPLE output:
|
|
With `prefix` = "start"
|
|
str: "2021-08-13",
|
|
With `prefix` = "end"
|
|
str: "2021-09-31",
|
|
"""
|
|
|
|
year = record.get(f"{prefix}.year")
|
|
month = record.get(f"{prefix}.month")
|
|
day = record.get(f"{prefix}.day")
|
|
return AirbyteDateTime(year, month, day).to_datetime().strftime("%Y-%m-%d")
|
|
|
|
|
|
def transform_date_range(
|
|
record: Dict,
|
|
dict_key: str = "dateRange",
|
|
props: List = ["start", "end"],
|
|
fields: List = ["year", "month", "day"],
|
|
) -> Mapping[str, Any]:
|
|
"""
|
|
:: EXAMPLE `dateRange` input structure in Analytics streams:
|
|
{
|
|
"dateRange": {
|
|
"start": {"month": 8, "day": 13, "year": 2021},
|
|
"end": {"month": 8, "day": 13, "year": 2021}
|
|
}
|
|
}
|
|
:: EXAMPLE output:
|
|
{
|
|
"start_date": "2021-08-13",
|
|
"end_date": "2021-08-13"
|
|
}
|
|
"""
|
|
# define list of tmp keys for cleanup.
|
|
keys_to_remove = [dict_key, "start.day", "start.month", "start.year", "end.day", "end.month", "end.year", "start", "end"]
|
|
|
|
target_dict: Dict = record.get(dict_key)
|
|
for prop in props:
|
|
# Update dict with flatten key:value
|
|
for field in fields:
|
|
record.update(**{f"{prop}.{field}": target_dict.get(prop).get(field)})
|
|
# We build `start_date` & `end_date` fields from nested structure.
|
|
record.update(**{"start_date": date_str_from_date_range(record, "start"), "end_date": date_str_from_date_range(record, "end")})
|
|
# Cleanup tmp fields & nested used parts
|
|
for key in keys_to_remove:
|
|
if key in record:
|
|
record.pop(key)
|
|
return record
|
|
|
|
|
|
def transform_targeting_criteria(record: Dict, dict_key: str = "targetingCriteria") -> Mapping[str, Any]:
|
|
"""
|
|
:: EXAMPLE `targetingCriteria` input structure:
|
|
{
|
|
"targetingCriteria": {
|
|
"include": {
|
|
"and": [
|
|
{
|
|
"or": {
|
|
"urn:li:adTargetingFacet:titles": [
|
|
"urn:li:title:100",
|
|
"urn:li:title:10326",
|
|
"urn:li:title:10457",
|
|
"urn:li:title:10738",
|
|
"urn:li:title:10966",
|
|
"urn:li:title:11349",
|
|
"urn:li:title:1159",
|
|
]
|
|
}
|
|
},
|
|
{"or": {"urn:li:adTargetingFacet:locations": ["urn:li:geo:103644278"]}},
|
|
{"or": {"urn:li:adTargetingFacet:interfaceLocales": ["urn:li:locale:en_US"]}},
|
|
]
|
|
},
|
|
"exclude": {
|
|
"or": {
|
|
"urn:li:adTargetingFacet:facet_Key1": [
|
|
"facet_test1",
|
|
"facet_test2",
|
|
],
|
|
"urn:li:adTargetingFacet:facet_Key2": [
|
|
"facet_test3",
|
|
"facet_test4",
|
|
],
|
|
}
|
|
}
|
|
}
|
|
:: EXAMPLE output:
|
|
{
|
|
"targetingCriteria": {
|
|
"include": {
|
|
"and": [
|
|
{
|
|
"type": "urn:li:adTargetingFacet:titles",
|
|
"values": [
|
|
"urn:li:title:100",
|
|
"urn:li:title:10326",
|
|
"urn:li:title:10457",
|
|
"urn:li:title:10738",
|
|
"urn:li:title:10966",
|
|
"urn:li:title:11349",
|
|
"urn:li:title:1159",
|
|
],
|
|
},
|
|
{
|
|
"type": "urn:li:adTargetingFacet:locations",
|
|
"values": ["urn:li:geo:103644278"],
|
|
},
|
|
{
|
|
"type": "urn:li:adTargetingFacet:interfaceLocales",
|
|
"values": ["urn:li:locale:en_US"],
|
|
},
|
|
]
|
|
},
|
|
"exclude": {
|
|
"or": [
|
|
{
|
|
"type": "urn:li:adTargetingFacet:facet_Key1",
|
|
"values": ["facet_test1", "facet_test2"],
|
|
},
|
|
{
|
|
"type": "urn:li:adTargetingFacet:facet_Key2",
|
|
"values": ["facet_test3", "facet_test4"],
|
|
},
|
|
]
|
|
},
|
|
}
|
|
"""
|
|
|
|
def unnest_dict(nested_dict: Dict) -> Iterable[Dict]:
|
|
"""
|
|
Unnest the nested dict to simplify the normalization
|
|
EXAMPLE OUTPUT:
|
|
[
|
|
{"type": "some_key", "values": "some_values"},
|
|
...,
|
|
{"type": "some_other_key", "values": "some_other_values"}
|
|
]
|
|
"""
|
|
|
|
for key, value in nested_dict.items():
|
|
values = []
|
|
if isinstance(value, List):
|
|
if len(value) > 0:
|
|
if isinstance(value[0], str):
|
|
values = value
|
|
elif isinstance(value[0], Dict):
|
|
for v in value:
|
|
values.append(v)
|
|
elif isinstance(value, Dict):
|
|
values.append(value)
|
|
yield {"type": key, "values": values}
|
|
|
|
# get the target dict from record
|
|
targeting_criteria = record.get(dict_key)
|
|
|
|
# transform `include`
|
|
if "include" in targeting_criteria:
|
|
and_list = targeting_criteria.get("include").get("and")
|
|
updated_include = {"and": []}
|
|
for k in and_list:
|
|
or_dict = k.get("or")
|
|
for j in unnest_dict(or_dict):
|
|
updated_include["and"].append(j)
|
|
# Replace the original 'and' with updated_include
|
|
record["targetingCriteria"]["include"] = updated_include
|
|
|
|
# transform `exclude` if present
|
|
if "exclude" in targeting_criteria:
|
|
or_dict = targeting_criteria.get("exclude").get("or")
|
|
updated_exclude = {"or": []}
|
|
for k in unnest_dict(or_dict):
|
|
updated_exclude["or"].append(k)
|
|
# Replace the original 'or' with updated_exclude
|
|
record["targetingCriteria"]["exclude"] = updated_exclude
|
|
|
|
return record
|
|
|
|
|
|
def transform_variables(record: Dict, dict_key: str = "variables") -> Mapping[str, Any]:
|
|
"""
|
|
:: EXAMPLE `variables` input:
|
|
{
|
|
"variables": {
|
|
"data": {
|
|
"com.linkedin.ads.SponsoredUpdateCreativeVariables": {
|
|
"activity": "urn:li:activity:1234",
|
|
"directSponsoredContent": 0,
|
|
"share": "urn:li:share:1234",
|
|
}
|
|
}
|
|
}
|
|
}
|
|
:: EXAMPLE output:
|
|
{
|
|
"variables": {
|
|
"type": "com.linkedin.ads.SponsoredUpdateCreativeVariables",
|
|
"values": [
|
|
{"key": "activity", "value": "urn:li:activity:1234"},
|
|
{"key": "directSponsoredContent", "value": 0},
|
|
{"key": "share", "value": "urn:li:share:1234"},
|
|
],
|
|
}
|
|
}
|
|
"""
|
|
|
|
variables = record.get(dict_key).get("data")
|
|
for key, params in variables.items():
|
|
record["variables"]["type"] = key
|
|
record["variables"]["values"] = []
|
|
for key, value in params.items():
|
|
# convert various datatypes of values into the string
|
|
record["variables"]["values"].append({"key": key, "value": json.dumps(value, ensure_ascii=True)})
|
|
# Clean the nested structure
|
|
record["variables"].pop("data")
|
|
return record
|
|
|
|
|
|
def transform_col_names(record: Dict, dict_keys: list = []) -> Mapping[str, Any]:
|
|
"""
|
|
Rename records keys (columns) indicated in `dict_keys` to avoid normalization issues for certain destinations.
|
|
Example:
|
|
The `pivot` or `PIVOT` is the reserved keyword for DESTINATION REDSHIFT, we should avoid using it in this case.
|
|
https://github.com/airbytehq/airbyte/issues/13018
|
|
"""
|
|
for key in dict_keys:
|
|
if key in record:
|
|
record[f"_{key}"] = record[key] # create new key from original
|
|
record.pop(key) # remove the original key
|
|
return record
|
|
|
|
|
|
def transform_pivot_values(record: Dict) -> Mapping[str, Any]:
|
|
pivot_values = record.get("pivotValues", [])
|
|
record["string_of_pivot_values"] = ",".join(pivot_values)
|
|
return record
|
|
|
|
|
|
def transform_data(records: List) -> Iterable[Mapping]:
|
|
"""
|
|
We need to transform the nested complex data structures into simple key:value pair,
|
|
to be properly normalised in the destination.
|
|
"""
|
|
for record in records:
|
|
if "changeAuditStamps" in record:
|
|
record = transform_change_audit_stamps(record)
|
|
|
|
if "dateRange" in record:
|
|
record = transform_date_range(record)
|
|
|
|
if "targetingCriteria" in record:
|
|
record = transform_targeting_criteria(record)
|
|
|
|
if "variables" in record:
|
|
record = transform_variables(record)
|
|
|
|
if "pivotValues" in record:
|
|
record = transform_pivot_values(record)
|
|
|
|
record = transform_col_names(record, DESTINATION_RESERVED_KEYWORDS)
|
|
|
|
yield record
|