Fixes https://github.com/airbytehq/oncall/issues/9565 ## What When migrating tolow-code + manifest-only format, we didn't migrate the dimensionFilter feature, this change adds this back so requests to Google's API include the dimension filter in the body of the request. ## How There are two things that are done now: 1. We take the incoming config and transform it into the format expected by Google's API. This is a direct port of what we used to do as a pre-sync transformation: https://github.com/airbytehq/airbyte/pull/60342/files#diff-c598765182c592504650290115e49900ef4b473307ddd69ea1662e6a6865cddfL206-L223 . This would also avoid a breaking change 2. Define in the stream template, the interpolation to inject the dimensionFilter (if it exists) into the request body which will be applied for each dynamic stream created by a custom_report ## Review guide 1. `components.py` 3. `manifest.yaml` ## User Impact Should only be additive and fix customers with the issue ## Can this PR be safely reverted and rolled back? - [x] YES 💚 - [ ] NO ❌
209 lines
9.0 KiB
Python
209 lines
9.0 KiB
Python
#
|
|
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import dataclass
|
|
from itertools import islice
|
|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
|
|
|
|
|
|
@dataclass
|
|
class CombinedExtractor(RecordExtractor):
|
|
"""
|
|
Extractor that merges the output of multiple sub-extractors into a single record.
|
|
This extractor takes a list of `RecordExtractor` instances (`extractors`), each of which
|
|
independently extracts records from the response. For each response, the extractor:
|
|
1. Invokes each sub-extractor to generate iterables of records.
|
|
2. Zips the results together, so that the first record from each extractor is combined,
|
|
the second from each, and so on.
|
|
3. Merges each group of records into a single dictionary using `dict.update()`.
|
|
The result is a sequence of dictionaries where each dictionary contains the merged keys
|
|
and values from the corresponding records across all extractors.
|
|
Example:
|
|
keys_extractor -> yields: [{"name": "Alice", "age": 30}]
|
|
extra_data_extractor -> yields: [{"country": "US"}]
|
|
CombinedExtractor(extractors=[keys_extractor, extra_data_extractor]) ->
|
|
yields: [{"name": "Alice", "age": 30, "country": "US"}]
|
|
"""
|
|
|
|
extractors: List[RecordExtractor]
|
|
|
|
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
|
|
extractors_records = [extractor.extract_records(response) for extractor in self.extractors]
|
|
|
|
for records in zip(*extractors_records):
|
|
merged = {}
|
|
for record in records:
|
|
merged.update(record) # merge all fields
|
|
yield merged
|
|
|
|
|
|
@dataclass
|
|
class KeyValueExtractor(RecordExtractor):
|
|
"""
|
|
Extractor that combines keys and values from two separate extractors.
|
|
|
|
The `keys_extractor` and `values_extractor` extract records independently
|
|
from the response. Their outputs are zipped together to form key-value mappings.
|
|
|
|
Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
|
|
and each value from `values_extractor` is the value for that key.
|
|
|
|
Example:
|
|
keys_extractor -> yields: ["name", "age"]
|
|
values_extractor -> yields: ["Alice", 30, "Brian", 32]
|
|
result: { "name": "Alice", "age": 30 } and { "name": "Brian", "age": 32 }
|
|
|
|
---
|
|
More specifically, we:
|
|
- Start with a list of keys, e.g. ['date', 'firstUserCampaignName', 'firstUserMedium'].
|
|
- Receive a flat sequence of values in that same order, e.g.:
|
|
['20231001', 'TikTok-Conversion', 'ads', '20231001', 'TikTok-Mahta', 'ads'].
|
|
- Because there are three keys, we split the flat list into chunks of three values each.
|
|
Each chunk maps to the keys to form one dictionary:
|
|
{
|
|
"date": "20231001",
|
|
"firstUserCampaignName": "TikTok-Conversion",
|
|
"firstUserMedium": "ads"
|
|
}
|
|
and so on.
|
|
|
|
This allows transforming a flat list of values into structured records using a predefined key schema.
|
|
"""
|
|
|
|
keys_extractor: RecordExtractor
|
|
values_extractor: RecordExtractor
|
|
|
|
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
|
|
keys = list(self.keys_extractor.extract_records(response))
|
|
values = self.values_extractor.extract_records(response)
|
|
|
|
has_more_chunks = True
|
|
while has_more_chunks:
|
|
chunk = [next(values, None) for _ in keys]
|
|
if any(v is None for v in chunk):
|
|
has_more_chunks = False
|
|
else:
|
|
yield dict(zip(keys, chunk))
|
|
|
|
|
|
@dataclass
|
|
class DimensionFilterConfigTransformation(RecordTransformation):
|
|
"""
|
|
Custom component that takes the incoming config and loops over each custom report and
|
|
transforms the dimensionFilter definition into the format required by the Google Analytics
|
|
API.
|
|
|
|
This is not a config migration, but rather an always performed transformation and requires
|
|
a custom component because this is the easiest way to consolidate all the transformation
|
|
logic in one place. The alternative would be a very complex interpolation of the request body.
|
|
"""
|
|
|
|
def transform(
|
|
self,
|
|
record: MutableMapping[str, Any],
|
|
config: Optional[Mapping[str, Any]] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
for custom_report in record.get("custom_reports_array", []):
|
|
if "dimensionFilter" in custom_report:
|
|
custom_report["dimensionFilter"] = self.transform_dimension_filter(custom_report["dimensionFilter"])
|
|
|
|
def transform_dimension_filter(self, dimension_filter: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
"""
|
|
Performs an in-place transformation of the incoming dimension_filter from our config
|
|
settings into its expected shape according to Google's docs:
|
|
https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/FilterExpression
|
|
"""
|
|
|
|
transformed_json = {}
|
|
filter_type = dimension_filter.get("filter_type")
|
|
|
|
if filter_type in ["andGroup", "orGroup"]:
|
|
expressions = dimension_filter.get("expressions", [])
|
|
transformed_expressions = [self.transform_expression(exp) for exp in expressions]
|
|
transformed_json = {filter_type: {"expressions": transformed_expressions}} if transformed_expressions else {}
|
|
|
|
elif filter_type == "notExpression":
|
|
expression = dimension_filter.get("expression")
|
|
transformed_expression = self.transform_expression(expression)
|
|
transformed_json = {filter_type: transformed_expression}
|
|
|
|
elif filter_type == "filter":
|
|
transformed_json = self.transform_expression(dimension_filter)
|
|
|
|
return transformed_json
|
|
|
|
def transform_expression(self, expression: Mapping[str, Any]):
|
|
"""
|
|
Performs an in-place transformation of the incoming dimension_filter from our config
|
|
settings into its expected shape according to Google's docs:
|
|
https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/FilterExpression#filter
|
|
"""
|
|
transformed_expression = {"fieldName": expression.get("field_name")}
|
|
filter = expression.get("filter")
|
|
filter_name = filter.get("filter_name")
|
|
|
|
if filter_name == "stringFilter":
|
|
transformed_expression.update(self.transform_string_filter(filter))
|
|
elif filter_name == "inListFilter":
|
|
transformed_expression.update(self.transform_in_list_filter(filter))
|
|
elif filter_name == "numericFilter":
|
|
transformed_expression.update(self.transform_numeric_filter(filter))
|
|
elif filter_name == "betweenFilter":
|
|
transformed_expression.update(self.transform_between_filter(filter))
|
|
|
|
return {"filter": transformed_expression}
|
|
|
|
@staticmethod
|
|
def transform_string_filter(filter: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
string_filter = {"value": filter.get("value")}
|
|
if "matchType" in filter:
|
|
string_filter["matchType"] = filter.get("matchType")[0]
|
|
if "caseSensitive" in filter:
|
|
string_filter["caseSensitive"] = filter.get("caseSensitive")
|
|
return {"stringFilter": string_filter}
|
|
|
|
@staticmethod
|
|
def transform_in_list_filter(filter: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
in_list_filter = {"values": filter.get("values")}
|
|
if "caseSensitive" in filter:
|
|
in_list_filter["caseSensitive"] = filter.get("caseSensitive")
|
|
return {"inListFilter": in_list_filter}
|
|
|
|
@staticmethod
|
|
def transform_numeric_filter(filter: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
numeric_filter = {
|
|
"value": {filter.get("value").get("value_type"): filter.get("value").get("value")},
|
|
}
|
|
if "operation" in filter:
|
|
numeric_filter["operation"] = filter.get("operation")[0]
|
|
return {"numericFilter": numeric_filter}
|
|
|
|
@staticmethod
|
|
def transform_between_filter(filter: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
from_value = filter.get("fromValue")
|
|
to_value = filter.get("toValue")
|
|
|
|
from_value_type = from_value.get("value_type")
|
|
to_value_type = to_value.get("value_type")
|
|
|
|
if from_value_type == "doubleValue" and isinstance(from_value.get("value"), str):
|
|
from_value["value"] = float(from_value.get("value"))
|
|
if to_value_type == "doubleValue" and isinstance(to_value.get("value"), str):
|
|
to_value["value"] = float(to_value.get("value"))
|
|
|
|
return {
|
|
"betweenFilter": {
|
|
"fromValue": {from_value_type: from_value.get("value")},
|
|
"toValue": {to_value_type: to_value.get("value")},
|
|
}
|
|
}
|