1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-bing-ads/components.py
Christo Grabowski dcf83a2c25 fix(source-bing-ads): always decompress downloaded report from bulk streams (#64952)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
2025-08-15 21:41:08 -07:00

470 lines
19 KiB
Python

# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import csv
import gzip
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import cached_property
from io import StringIO
from typing import Any, Dict, Generator, Iterable, List, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.schema import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
PARENT_SLICE_KEY: str = "parent_slice"
@dataclass
class DuplicatedRecordsFilter(RecordFilter):
"""
Filter duplicated records based on the "Id" field.
This can happen when we use predicates that could match the same record
multiple times.
e.g.
With one record like:
{"type":"RECORD","record":{"stream":"accounts","data":{"Id":151049662,
"Name":"Airbyte Plumbing"},"emitted_at":1748277607993}}
account_names in config:
[
{
"name": "Airbyte",
"operator": "Contains"
},
{
"name": "Plumbing",
"operator": "Contains"
}
],
will return the same record twice, once for each predicate.
"""
CONFIG_PREDICATES = "account_names"
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self._seen_keys = set()
@cached_property
def _using_predicates(self) -> bool:
"""
Indicates whether the connection uses predicates.
:return: True if the connector uses predicates, False otherwise
"""
predicates = self.config.get(self.CONFIG_PREDICATES)
return bool(predicates and isinstance(predicates, list) and predicates)
def filter_records(
self, records: List[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
for record in records:
if not self._using_predicates:
yield record
else:
key = record["Id"]
if key not in self._seen_keys:
self._seen_keys.add(key)
yield record
@dataclass
class BingAdsCampaignsRecordTransformer(RecordTransformation):
"""
Transform Campaigns records from Bing Ads API to ensure consistent data structure and types.
This transformer handles two main transformations:
Settings field transformations:
1. For settings with Details, wrap the Details array in TargetSettingDetail structure:
{"Details": {"TargetSettingDetail": original_details}}
2. For settings without Details or with null Details, preserve the original structure
3. Convert empty lists ([]) to null for backward compatibility
4. Convert string values to integers for keys ending with "Id" (when valid integers)
5. Convert PageFeedIds lists to object format: {"long": [int_values]}
6. Wrap all transformed settings in {"Setting": transformed_settings}
BiddingScheme field transformations:
1. Recursively convert all integer values to floats to ensure consistent numeric type handling
Example Settings transformation:
Input: {"Settings": [{"Type": "Target", "Details": [...], "PageFeedIds": ["123", "456"]}]}
Output: {"Settings": {"Setting": [{"Type": "Target",
"Details": {"TargetSettingDetail": [...]},
"PageFeedIds": {"long": [123, 456]}}]}}
Example BiddingScheme transformation:
Input: {"BiddingScheme": {"MaxCpc": {"Amount": 5}}}
Output: {"BiddingScheme": {"MaxCpc": {"Amount": 5.0}}}
"""
def transform(
self,
record: MutableMapping[str, Any],
config: Optional[Mapping[str, Any]] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
"""
Transform the record by converting the Settings and BiddingScheme properties.
Args:
record: The campaign record to transform (modified in-place)
config: Optional configuration (unused)
stream_state: Optional stream state (unused)
stream_slice: Optional stream slice (unused)
"""
settings = record.get("Settings")
bidding_scheme = record.get("BiddingScheme")
if settings:
self._transform_settings_property(record, settings)
if bidding_scheme:
self._transform_bidding_scheme_property(bidding_scheme)
def _transform_settings_property(self, record: MutableMapping[str, Any], settings: Any) -> None:
"""
Transform the Settings property of a campaign record.
Converts the Settings list into the expected nested structure and applies
value transformations to individual setting properties.
Args:
record: The campaign record containing the Settings (modified in-place)
settings: The Settings value from the record
"""
if not isinstance(settings, list) or len(settings) == 0:
# Keep original value (None, empty list, etc.)
return
transformed_settings = []
for setting in settings:
if not isinstance(setting, dict):
# Keep non-dict settings as-is
transformed_settings.append(setting)
continue
if "Details" in setting and setting["Details"] is not None:
# Wrap Details in TargetSettingDetail only if Details is not None
transformed_setting = {
"Type": setting.get("Type"),
"Details": {"TargetSettingDetail": setting["Details"]},
}
# Add any other properties that might exist
for key, value in setting.items():
if key not in ["Type", "Details"]:
transformed_setting[key] = self._transform_setting_value(key, value)
transformed_settings.append(transformed_setting)
else:
# Keep setting as-is (no Details to wrap or Details is None)
# But still convert empty lists to null and string IDs to integers
transformed_setting = {}
for key, value in setting.items():
transformed_setting[key] = self._transform_setting_value(key, value)
transformed_settings.append(transformed_setting)
# Wrap the transformed settings in the expected structure
record["Settings"] = {"Setting": transformed_settings}
def _transform_setting_value(self, key: str, value: Any) -> Any:
"""
Transform individual setting values based on key name and value type.
Applies specific transformations:
- Empty lists become null
- PageFeedIds lists become {"long": [int_values]} objects
- String values for keys ending with "Id" become integers (when valid)
Args:
key: The setting property name
value: The setting property value
Returns:
The transformed value
"""
# Convert empty lists to null for backward compatibility
if isinstance(value, list) and len(value) == 0:
return None
elif key == "PageFeedIds":
# Convert PageFeedIds list to object with long array
return self._convert_page_feed_id_lists(value)
else:
# Convert string IDs to integers
return self._convert_id_strings_to_integers(key, value)
def _transform_bidding_scheme_property(self, bidding_scheme: Any) -> None:
"""
Transform the BiddingScheme property of a campaign record.
Recursively converts all integer values to floats for consistent numeric handling.
"""
if bidding_scheme and isinstance(bidding_scheme, dict):
self._convert_integers_to_floats(bidding_scheme)
def _convert_integers_to_floats(self, obj: MutableMapping[str, Any]) -> None:
"""
Recursively convert integer values to floats in a dictionary.
This ensures consistent numeric type handling across all BiddingScheme values.
Only converts values that are whole numbers (int or float with no decimal part).
"""
if not isinstance(obj, dict):
return
for key, value in obj.items():
if isinstance(value, dict):
self._convert_integers_to_floats(value)
continue
# Convert any whole numbers to float type
if isinstance(value, (int, float)) and value == int(value):
obj[key] = float(value)
def _convert_id_strings_to_integers(self, key: str, value: Any) -> Any:
"""
Convert string values to integers for keys ending with "Id".
Only converts if the string represents a valid integer. If conversion fails,
the original string value is preserved.
"""
if key.endswith("Id") and isinstance(value, str):
try:
return int(value)
except ValueError:
# If conversion fails, return original value
return value
return value
def _convert_page_feed_id_lists(self, value: Any) -> Any:
"""
Convert PageFeedIds from list of strings to object with long array of integers.
This transformation is required for compatibility with the expected API format.
If any string cannot be converted to an integer, the original value is returned.
Example:
Input: ["8246337222870", "1234567890"]
Output: {"long": [8246337222870, 1234567890]}
"""
if isinstance(value, list) and len(value) > 0:
try:
# Convert string IDs to integers
long_values = [int(id_str) for id_str in value if isinstance(id_str, str)]
return {"long": long_values}
except ValueError:
# If conversion fails, return original value
return value
return value
@dataclass
class BulkDatetimeToRFC3339(RecordTransformation):
"""
Bing Ads Bulk API provides datetime fields in custom format with milliseconds: "04/27/2023 18:00:14.970"
Return datetime in RFC3339 format: "2023-04-27T18:00:14.970+00:00"
"""
def transform(
self,
record: MutableMapping[str, Any],
config: Optional[Mapping[str, Any]] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
original_value = record["Modified Time"]
if original_value is not None and original_value != "":
try:
record["Modified Time"] = (
datetime.strptime(original_value, "%m/%d/%Y %H:%M:%S.%f")
.replace(tzinfo=timezone.utc)
.isoformat(timespec="milliseconds")
)
except ValueError:
pass # Keep original value if parsing fails
# Don't set to None - leave original value unchanged
@dataclass
class LightSubstreamPartitionRouter(SubstreamPartitionRouter):
def stream_slices(self) -> Iterable[StreamSlice]:
"""
For migration to manifest connector we needed to migrate legacy state to per partition
but regular SubstreamPartitionRouter will include the parent_slice in the partition that
LegacyToPerPartitionStateMigration can't add in transformed state.
Then, we remove the parent_slice.
e.g.
super().stream_slices() = [
StreamSlice(partition={"parent_slice": {"user_id": 1, "parent_slice": {}}, "account_id": 1}, cursor_slice={}, extra_fields=None),
StreamSlice(partition={"parent_slice": {"user_id": 2, "parent_slice": {}}, "account_id": 2}, cursor_slice={}, extra_fields=None) ]
Router yields: [
StreamSlice(partition={"account_id": 1}, cursor_slice={}, extra_fields=None),
StreamSlice(partition={"account_id": 2}, cursor_slice={}, extra_fields=None),
]
"""
stream_slices = super().stream_slices()
for stream_slice in stream_slices:
stream_slice_partition: Dict[str, Any] = dict(stream_slice.partition)
partition_keys = list(stream_slice_partition.keys())
if PARENT_SLICE_KEY in partition_keys:
partition_keys.remove(PARENT_SLICE_KEY)
stream_slice_partition.pop(PARENT_SLICE_KEY, None)
if len(partition_keys) != 1:
raise ValueError(f"SubstreamDedupPartitionRouter expects a single partition key-value pair. Got {stream_slice_partition}")
yield StreamSlice(
partition=stream_slice_partition,
cursor_slice=stream_slice.cursor_slice,
extra_fields=stream_slice.extra_fields,
)
class BulkStreamsStateMigration(StateMigration):
"""
Due to a bug in python implementation legacy state may look like this:
"streamState": {
"account_id": {
"Modified Time": "valid modified time"
},
"Id": "Id",
[record data ...]
"Modified Time": null,
[record data ...]
}
It happens when received record doesn't have a cursor field and state updating logic stores it in state.
To avoid parsing null cursor fields that lead to value error, this state migration deletes the top level cursor field and records data
if the cursor is null.
"""
cursor_field = "Modified Time"
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if self.cursor_field in stream_state.keys() and stream_state.get(self.cursor_field) is None:
return True
return False
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
if self.should_migrate(stream_state):
state_copy = deepcopy(stream_state)
for key, value in state_copy.items():
if not isinstance(value, dict):
del stream_state[key]
return stream_state
@dataclass
class CustomReportSchemaLoader(SchemaLoader):
"""
Creates custom report schema based on provided reporting columns.
"""
reporting_columns: List[str]
report_aggregation: str
def get_json_schema(self) -> Mapping[str, Any]:
if self.report_aggregation == "DayOfWeek":
self.reporting_columns = self.reporting_columns + ["DayOfWeek", "StartOfTimePeriod", "EndOfTimePeriod"]
if self.report_aggregation == "HourOfDay":
self.reporting_columns = self.reporting_columns + ["HourOfDay", "StartOfTimePeriod", "EndOfTimePeriod"]
self.reporting_columns = list(frozenset(self.reporting_columns))
columns_schema = {col: {"type": ["null", "string"]} for col in self.reporting_columns}
schema: Mapping[str, Any] = {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": True,
"properties": columns_schema,
}
return schema
@dataclass
class CustomReportTransformation(RecordTransformation):
report_aggregation: str
def transform_report_hourly_datetime_format_to_rfc_3339(self, original_value: str) -> str:
"""
Bing Ads API reports with hourly aggregation provides date fields in custom format: "2023-11-04|11"
Return date in RFC3339 format: "2023-11-04T11:00:00+00:00"
"""
return datetime.strptime(original_value, "%Y-%m-%d|%H").replace(tzinfo=timezone.utc).isoformat(timespec="seconds")
def transform(
self,
record: MutableMapping[str, Any],
config: Optional[Mapping[str, Any]] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
if self.report_aggregation == "Hourly":
if record.get("TimePeriod"):
record.update({"TimePeriod": self.transform_report_hourly_datetime_format_to_rfc_3339(record["TimePeriod"])})
if self.report_aggregation == "DayOfWeek":
cursor_field = record["TimePeriod"]
record.update(
{
"StartOfTimePeriod": stream_slice["start_time"],
"EndOfTimePeriod": stream_slice["end_time"],
"DayOfWeek": cursor_field,
"TimePeriod": stream_slice["end_time"],
}
)
record["TimePeriod"] = record["EndOfTimePeriod"]
if self.report_aggregation == "HourOfDay":
cursor_field = record["TimePeriod"]
record.update(
{
"StartOfTimePeriod": stream_slice["start_time"],
"EndOfTimePeriod": stream_slice["end_time"],
"HourOfDay": cursor_field,
"TimePeriod": stream_slice["end_time"],
}
)
@dataclass
class BingAdsGzipCsvDecoder(Decoder):
"""
Custom decoder that always attempts GZip decompression before parsing CSV data.
This is needed because Bing Ads now sends GZip compressed files from Azure Blob Storage
without proper compression headers, so the standard GzipDecoder fails to detect compression.
"""
def is_stream_response(self) -> bool:
return False
def decode(self, response) -> Generator[MutableMapping[str, Any], None, None]:
"""
Always attempt GZip decompression first, then fall back to plain CSV if that fails.
"""
try:
# First, try to decompress as GZip
decompressed_content = gzip.decompress(response.content)
# Parse as CSV with utf-8-sig encoding (handles BOM)
text_content = decompressed_content.decode("utf-8-sig")
csv_reader = csv.DictReader(StringIO(text_content))
for row in csv_reader:
yield row
except (gzip.BadGzipFile, OSError):
# If GZip decompression fails, try parsing as plain CSV
try:
text_content = response.content.decode("utf-8-sig")
csv_reader = csv.DictReader(StringIO(text_content))
for row in csv_reader:
yield row
except Exception as e:
# If both fail, log the error and yield empty
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to parse response as either GZip or plain CSV: {e}")
yield {}