# Copyright (c) 2025 Airbyte, Inc., all rights reserved. import csv import gzip from copy import deepcopy from dataclasses import dataclass from datetime import datetime, timezone from functools import cached_property from io import StringIO from typing import Any, Dict, Generator, Iterable, List, Mapping, MutableMapping, Optional from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter from airbyte_cdk.sources.declarative.schema import SchemaLoader from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState PARENT_SLICE_KEY: str = "parent_slice" @dataclass class DuplicatedRecordsFilter(RecordFilter): """ Filter duplicated records based on the "Id" field. This can happen when we use predicates that could match the same record multiple times. e.g. With one record like: {"type":"RECORD","record":{"stream":"accounts","data":{"Id":151049662, "Name":"Airbyte Plumbing"},"emitted_at":1748277607993}} account_names in config: [ { "name": "Airbyte", "operator": "Contains" }, { "name": "Plumbing", "operator": "Contains" } ], will return the same record twice, once for each predicate. """ CONFIG_PREDICATES = "account_names" def __post_init__(self, parameters: Mapping[str, Any]) -> None: super().__post_init__(parameters) self._seen_keys = set() @cached_property def _using_predicates(self) -> bool: """ Indicates whether the connection uses predicates. :return: True if the connector uses predicates, False otherwise """ predicates = self.config.get(self.CONFIG_PREDICATES) return bool(predicates and isinstance(predicates, list) and predicates) def filter_records( self, records: List[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, **kwargs ) -> Iterable[Mapping[str, Any]]: for record in records: if not self._using_predicates: yield record else: key = record["Id"] if key not in self._seen_keys: self._seen_keys.add(key) yield record @dataclass class BingAdsCampaignsRecordTransformer(RecordTransformation): """ Transform Campaigns records from Bing Ads API to ensure consistent data structure and types. This transformer handles two main transformations: Settings field transformations: 1. For settings with Details, wrap the Details array in TargetSettingDetail structure: {"Details": {"TargetSettingDetail": original_details}} 2. For settings without Details or with null Details, preserve the original structure 3. Convert empty lists ([]) to null for backward compatibility 4. Convert string values to integers for keys ending with "Id" (when valid integers) 5. Convert PageFeedIds lists to object format: {"long": [int_values]} 6. Wrap all transformed settings in {"Setting": transformed_settings} BiddingScheme field transformations: 1. Recursively convert all integer values to floats to ensure consistent numeric type handling Example Settings transformation: Input: {"Settings": [{"Type": "Target", "Details": [...], "PageFeedIds": ["123", "456"]}]} Output: {"Settings": {"Setting": [{"Type": "Target", "Details": {"TargetSettingDetail": [...]}, "PageFeedIds": {"long": [123, 456]}}]}} Example BiddingScheme transformation: Input: {"BiddingScheme": {"MaxCpc": {"Amount": 5}}} Output: {"BiddingScheme": {"MaxCpc": {"Amount": 5.0}}} """ def transform( self, record: MutableMapping[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: """ Transform the record by converting the Settings and BiddingScheme properties. Args: record: The campaign record to transform (modified in-place) config: Optional configuration (unused) stream_state: Optional stream state (unused) stream_slice: Optional stream slice (unused) """ settings = record.get("Settings") bidding_scheme = record.get("BiddingScheme") if settings: self._transform_settings_property(record, settings) if bidding_scheme: self._transform_bidding_scheme_property(bidding_scheme) def _transform_settings_property(self, record: MutableMapping[str, Any], settings: Any) -> None: """ Transform the Settings property of a campaign record. Converts the Settings list into the expected nested structure and applies value transformations to individual setting properties. Args: record: The campaign record containing the Settings (modified in-place) settings: The Settings value from the record """ if not isinstance(settings, list) or len(settings) == 0: # Keep original value (None, empty list, etc.) return transformed_settings = [] for setting in settings: if not isinstance(setting, dict): # Keep non-dict settings as-is transformed_settings.append(setting) continue if "Details" in setting and setting["Details"] is not None: # Wrap Details in TargetSettingDetail only if Details is not None transformed_setting = { "Type": setting.get("Type"), "Details": {"TargetSettingDetail": setting["Details"]}, } # Add any other properties that might exist for key, value in setting.items(): if key not in ["Type", "Details"]: transformed_setting[key] = self._transform_setting_value(key, value) transformed_settings.append(transformed_setting) else: # Keep setting as-is (no Details to wrap or Details is None) # But still convert empty lists to null and string IDs to integers transformed_setting = {} for key, value in setting.items(): transformed_setting[key] = self._transform_setting_value(key, value) transformed_settings.append(transformed_setting) # Wrap the transformed settings in the expected structure record["Settings"] = {"Setting": transformed_settings} def _transform_setting_value(self, key: str, value: Any) -> Any: """ Transform individual setting values based on key name and value type. Applies specific transformations: - Empty lists become null - PageFeedIds lists become {"long": [int_values]} objects - String values for keys ending with "Id" become integers (when valid) Args: key: The setting property name value: The setting property value Returns: The transformed value """ # Convert empty lists to null for backward compatibility if isinstance(value, list) and len(value) == 0: return None elif key == "PageFeedIds": # Convert PageFeedIds list to object with long array return self._convert_page_feed_id_lists(value) else: # Convert string IDs to integers return self._convert_id_strings_to_integers(key, value) def _transform_bidding_scheme_property(self, bidding_scheme: Any) -> None: """ Transform the BiddingScheme property of a campaign record. Recursively converts all integer values to floats for consistent numeric handling. """ if bidding_scheme and isinstance(bidding_scheme, dict): self._convert_integers_to_floats(bidding_scheme) def _convert_integers_to_floats(self, obj: MutableMapping[str, Any]) -> None: """ Recursively convert integer values to floats in a dictionary. This ensures consistent numeric type handling across all BiddingScheme values. Only converts values that are whole numbers (int or float with no decimal part). """ if not isinstance(obj, dict): return for key, value in obj.items(): if isinstance(value, dict): self._convert_integers_to_floats(value) continue # Convert any whole numbers to float type if isinstance(value, (int, float)) and value == int(value): obj[key] = float(value) def _convert_id_strings_to_integers(self, key: str, value: Any) -> Any: """ Convert string values to integers for keys ending with "Id". Only converts if the string represents a valid integer. If conversion fails, the original string value is preserved. """ if key.endswith("Id") and isinstance(value, str): try: return int(value) except ValueError: # If conversion fails, return original value return value return value def _convert_page_feed_id_lists(self, value: Any) -> Any: """ Convert PageFeedIds from list of strings to object with long array of integers. This transformation is required for compatibility with the expected API format. If any string cannot be converted to an integer, the original value is returned. Example: Input: ["8246337222870", "1234567890"] Output: {"long": [8246337222870, 1234567890]} """ if isinstance(value, list) and len(value) > 0: try: # Convert string IDs to integers long_values = [int(id_str) for id_str in value if isinstance(id_str, str)] return {"long": long_values} except ValueError: # If conversion fails, return original value return value return value @dataclass class BulkDatetimeToRFC3339(RecordTransformation): """ Bing Ads Bulk API provides datetime fields in custom format with milliseconds: "04/27/2023 18:00:14.970" Return datetime in RFC3339 format: "2023-04-27T18:00:14.970+00:00" """ def transform( self, record: MutableMapping[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: original_value = record["Modified Time"] if original_value is not None and original_value != "": try: record["Modified Time"] = ( datetime.strptime(original_value, "%m/%d/%Y %H:%M:%S.%f") .replace(tzinfo=timezone.utc) .isoformat(timespec="milliseconds") ) except ValueError: pass # Keep original value if parsing fails # Don't set to None - leave original value unchanged @dataclass class LightSubstreamPartitionRouter(SubstreamPartitionRouter): def stream_slices(self) -> Iterable[StreamSlice]: """ For migration to manifest connector we needed to migrate legacy state to per partition but regular SubstreamPartitionRouter will include the parent_slice in the partition that LegacyToPerPartitionStateMigration can't add in transformed state. Then, we remove the parent_slice. e.g. super().stream_slices() = [ StreamSlice(partition={"parent_slice": {"user_id": 1, "parent_slice": {}}, "account_id": 1}, cursor_slice={}, extra_fields=None), StreamSlice(partition={"parent_slice": {"user_id": 2, "parent_slice": {}}, "account_id": 2}, cursor_slice={}, extra_fields=None) ] Router yields: [ StreamSlice(partition={"account_id": 1}, cursor_slice={}, extra_fields=None), StreamSlice(partition={"account_id": 2}, cursor_slice={}, extra_fields=None), ] """ stream_slices = super().stream_slices() for stream_slice in stream_slices: stream_slice_partition: Dict[str, Any] = dict(stream_slice.partition) partition_keys = list(stream_slice_partition.keys()) if PARENT_SLICE_KEY in partition_keys: partition_keys.remove(PARENT_SLICE_KEY) stream_slice_partition.pop(PARENT_SLICE_KEY, None) if len(partition_keys) != 1: raise ValueError(f"SubstreamDedupPartitionRouter expects a single partition key-value pair. Got {stream_slice_partition}") yield StreamSlice( partition=stream_slice_partition, cursor_slice=stream_slice.cursor_slice, extra_fields=stream_slice.extra_fields, ) class BulkStreamsStateMigration(StateMigration): """ Due to a bug in python implementation legacy state may look like this: "streamState": { "account_id": { "Modified Time": "valid modified time" }, "Id": "Id", [record data ...] "Modified Time": null, [record data ...] } It happens when received record doesn't have a cursor field and state updating logic stores it in state. To avoid parsing null cursor fields that lead to value error, this state migration deletes the top level cursor field and records data if the cursor is null. """ cursor_field = "Modified Time" def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: if self.cursor_field in stream_state.keys() and stream_state.get(self.cursor_field) is None: return True return False def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: if self.should_migrate(stream_state): state_copy = deepcopy(stream_state) for key, value in state_copy.items(): if not isinstance(value, dict): del stream_state[key] return stream_state @dataclass class CustomReportSchemaLoader(SchemaLoader): """ Creates custom report schema based on provided reporting columns. """ reporting_columns: List[str] report_aggregation: str def get_json_schema(self) -> Mapping[str, Any]: if self.report_aggregation == "DayOfWeek": self.reporting_columns = self.reporting_columns + ["DayOfWeek", "StartOfTimePeriod", "EndOfTimePeriod"] if self.report_aggregation == "HourOfDay": self.reporting_columns = self.reporting_columns + ["HourOfDay", "StartOfTimePeriod", "EndOfTimePeriod"] self.reporting_columns = list(frozenset(self.reporting_columns)) columns_schema = {col: {"type": ["null", "string"]} for col in self.reporting_columns} schema: Mapping[str, Any] = { "$schema": "https://json-schema.org/draft-07/schema#", "type": ["null", "object"], "additionalProperties": True, "properties": columns_schema, } return schema @dataclass class CustomReportTransformation(RecordTransformation): report_aggregation: str def transform_report_hourly_datetime_format_to_rfc_3339(self, original_value: str) -> str: """ Bing Ads API reports with hourly aggregation provides date fields in custom format: "2023-11-04|11" Return date in RFC3339 format: "2023-11-04T11:00:00+00:00" """ return datetime.strptime(original_value, "%Y-%m-%d|%H").replace(tzinfo=timezone.utc).isoformat(timespec="seconds") def transform( self, record: MutableMapping[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: if self.report_aggregation == "Hourly": if record.get("TimePeriod"): record.update({"TimePeriod": self.transform_report_hourly_datetime_format_to_rfc_3339(record["TimePeriod"])}) if self.report_aggregation == "DayOfWeek": cursor_field = record["TimePeriod"] record.update( { "StartOfTimePeriod": stream_slice["start_time"], "EndOfTimePeriod": stream_slice["end_time"], "DayOfWeek": cursor_field, "TimePeriod": stream_slice["end_time"], } ) record["TimePeriod"] = record["EndOfTimePeriod"] if self.report_aggregation == "HourOfDay": cursor_field = record["TimePeriod"] record.update( { "StartOfTimePeriod": stream_slice["start_time"], "EndOfTimePeriod": stream_slice["end_time"], "HourOfDay": cursor_field, "TimePeriod": stream_slice["end_time"], } ) @dataclass class BingAdsGzipCsvDecoder(Decoder): """ Custom decoder that always attempts GZip decompression before parsing CSV data. This is needed because Bing Ads now sends GZip compressed files from Azure Blob Storage without proper compression headers, so the standard GzipDecoder fails to detect compression. """ def is_stream_response(self) -> bool: return False def decode(self, response) -> Generator[MutableMapping[str, Any], None, None]: """ Always attempt GZip decompression first, then fall back to plain CSV if that fails. """ try: # First, try to decompress as GZip decompressed_content = gzip.decompress(response.content) # Parse as CSV with utf-8-sig encoding (handles BOM) text_content = decompressed_content.decode("utf-8-sig") csv_reader = csv.DictReader(StringIO(text_content)) for row in csv_reader: yield row except (gzip.BadGzipFile, OSError): # If GZip decompression fails, try parsing as plain CSV try: text_content = response.content.decode("utf-8-sig") csv_reader = csv.DictReader(StringIO(text_content)) for row in csv_reader: yield row except Exception as e: # If both fail, log the error and yield empty import logging logger = logging.getLogger(__name__) logger.error(f"Failed to parse response as either GZip or plain CSV: {e}") yield {}