184 lines
6.6 KiB
Python
184 lines
6.6 KiB
Python
#
|
|
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Mapping, Optional
|
|
|
|
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
|
|
from airbyte_cdk.sources.declarative.schema import SchemaLoader
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
|
|
|
|
|
|
@dataclass
|
|
class NestedSubstreamStateMigration(StateMigration):
|
|
"""
|
|
We require a custom state migration because SearchAnalytics streams contain two nested levels of
|
|
substreams. The existing LegacyToPerPartitionStateMigration only handles one level.
|
|
|
|
Legacy state format is as follows:
|
|
{
|
|
"date": "2025-05-28",
|
|
"https://www.example.com/": {
|
|
"web": {
|
|
"date": "2025-05-25"
|
|
},
|
|
"news": {
|
|
"date": "2023-05-22"
|
|
}
|
|
}
|
|
}
|
|
|
|
The resulting migrated per-partition state is:
|
|
{
|
|
"use_global_cursor": false,
|
|
"states": [
|
|
{
|
|
"partition": {
|
|
"search_type": "web",
|
|
"site_url": "https://www.example.com/"
|
|
},
|
|
"cursor": {
|
|
"date": "2025-05-25"
|
|
}
|
|
},
|
|
{
|
|
"partition": {
|
|
"search_type": "news",
|
|
"site_url": "https://www.example.com/"
|
|
},
|
|
"cursor": {
|
|
"date": "2023-05-22"
|
|
}
|
|
}],
|
|
"state": {
|
|
"date": "2025-05-25"
|
|
}
|
|
}
|
|
"""
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
return len(stream_state) > 0 and "states" not in stream_state
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
per_partition_state = []
|
|
for site_url_key, search_type_state in stream_state.items():
|
|
if site_url_key == "date":
|
|
# The legacy state also contains a global cursor value under the `date` key which equates
|
|
# to global state.
|
|
#
|
|
# However, the Python implementation does not appear to be implemented
|
|
# correctly and simply saves the state of the last seen partition. Since I don't trust the
|
|
# legacy value and in the current implementation global state is applied to partitions
|
|
# without an existing value, I'm making a conscious choice to not migrate the global value.
|
|
continue
|
|
else:
|
|
site_url = site_url_key
|
|
for search_type_key, cursor in search_type_state.items():
|
|
per_partition_state.append({"partition": {"site_url": site_url, "search_type": search_type_key}, "cursor": cursor})
|
|
return {
|
|
"use_global_cursor": False,
|
|
"states": per_partition_state,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class CustomReportExtractDimensionsFromKeys(RecordTransformation):
|
|
"""
|
|
A record transformation that remaps each value in the keys array back to its associated
|
|
dimension. The reason this is a custom component is because we're unable to use list
|
|
comprehension and and enumerate() is not a valid function in our Jinja contact so can't
|
|
iterate over the dimensions defined in the config to create each field transformation on the
|
|
stream_template for each custom report.
|
|
|
|
If we were able to, the actual ComponentMappingDefinition would look like this:
|
|
|
|
type: ComponentMappingDefinition
|
|
field_path:
|
|
- transformations
|
|
- "1"
|
|
- fields
|
|
value: "{{ [{'path': [dimension], 'value': '{{ record['keys'][index]} for index, dimension in enumerate(components_values['dimensions'])] }}"
|
|
|
|
or
|
|
|
|
type: ComponentMappingDefinition
|
|
field_path:
|
|
- transformations
|
|
- "1"
|
|
- fields
|
|
value: >
|
|
{% for index, dimension in enumerate(components_values["dimensions"]) %}
|
|
- type: AddFields
|
|
fields:
|
|
- path: [ {{ dimension }} ]
|
|
value: "{{ record['keys'][index] }}"
|
|
{% endfor %}
|
|
"""
|
|
|
|
dimensions: List[str] = field(default_factory=lambda: [])
|
|
|
|
def transform(
|
|
self,
|
|
record: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
for dimension in self.dimensions:
|
|
record[dimension] = record["keys"].pop(0)
|
|
|
|
record.pop("keys")
|
|
|
|
|
|
@dataclass
|
|
class CustomReportSchemaLoader(SchemaLoader):
|
|
"""
|
|
Custom schema loader is needed because Google Search Console's custom reports streams
|
|
because the schema is dependent on which dimensions are selected in the config. Right now,
|
|
only DynamicSchemaLoader which is based on the response from an API endpoint supports
|
|
remapping of types to Airbyte schema types. This CustomReportSchemaLoader functions
|
|
more like a static schema loader and so we must perform the remapping in this custom component.
|
|
"""
|
|
|
|
DIMENSION_TO_PROPERTY_SCHEMA_MAP = {
|
|
"country": [{"country": {"type": ["null", "string"]}}],
|
|
"date": [{"date": {"type": ["null", "string"], "format": "date"}}],
|
|
"device": [{"device": {"type": ["null", "string"]}}],
|
|
"page": [{"page": {"type": ["null", "string"]}}],
|
|
"query": [{"query": {"type": ["null", "string"]}}],
|
|
}
|
|
|
|
dimensions: List[str]
|
|
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
schema: Mapping[str, Any] = {
|
|
"$schema": "https://json-schema.org/draft-07/schema#",
|
|
"type": ["null", "object"],
|
|
"additionalProperties": True,
|
|
"properties": {
|
|
# metrics
|
|
"clicks": {"type": ["null", "integer"]},
|
|
"ctr": {"type": ["null", "number"], "multipleOf": 1e-25},
|
|
"impressions": {"type": ["null", "integer"]},
|
|
"position": {"type": ["null", "number"], "multipleOf": 1e-25},
|
|
# default fields
|
|
"search_type": {"type": ["null", "string"]},
|
|
"site_url": {"type": ["null", "string"]},
|
|
},
|
|
}
|
|
|
|
# dimensions
|
|
dimension_properties = self._dimension_to_property_schema()
|
|
schema["properties"].update(dimension_properties)
|
|
return schema
|
|
|
|
def _dimension_to_property_schema(self) -> dict:
|
|
properties = {}
|
|
for dimension in sorted(self.dimensions):
|
|
fields = self.DIMENSION_TO_PROPERTY_SCHEMA_MAP[dimension]
|
|
for field in fields:
|
|
properties = {**properties, **field}
|
|
return properties
|