1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-google-search-console/components.py
2025-06-23 15:32:19 -07:00

184 lines
6.6 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass, field
from typing import Any, Dict, List, Mapping, Optional
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.schema import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
@dataclass
class NestedSubstreamStateMigration(StateMigration):
"""
We require a custom state migration because SearchAnalytics streams contain two nested levels of
substreams. The existing LegacyToPerPartitionStateMigration only handles one level.
Legacy state format is as follows:
{
"date": "2025-05-28",
"https://www.example.com/": {
"web": {
"date": "2025-05-25"
},
"news": {
"date": "2023-05-22"
}
}
}
The resulting migrated per-partition state is:
{
"use_global_cursor": false,
"states": [
{
"partition": {
"search_type": "web",
"site_url": "https://www.example.com/"
},
"cursor": {
"date": "2025-05-25"
}
},
{
"partition": {
"search_type": "news",
"site_url": "https://www.example.com/"
},
"cursor": {
"date": "2023-05-22"
}
}],
"state": {
"date": "2025-05-25"
}
}
"""
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return len(stream_state) > 0 and "states" not in stream_state
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
per_partition_state = []
for site_url_key, search_type_state in stream_state.items():
if site_url_key == "date":
# The legacy state also contains a global cursor value under the `date` key which equates
# to global state.
#
# However, the Python implementation does not appear to be implemented
# correctly and simply saves the state of the last seen partition. Since I don't trust the
# legacy value and in the current implementation global state is applied to partitions
# without an existing value, I'm making a conscious choice to not migrate the global value.
continue
else:
site_url = site_url_key
for search_type_key, cursor in search_type_state.items():
per_partition_state.append({"partition": {"site_url": site_url, "search_type": search_type_key}, "cursor": cursor})
return {
"use_global_cursor": False,
"states": per_partition_state,
}
@dataclass
class CustomReportExtractDimensionsFromKeys(RecordTransformation):
"""
A record transformation that remaps each value in the keys array back to its associated
dimension. The reason this is a custom component is because we're unable to use list
comprehension and and enumerate() is not a valid function in our Jinja contact so can't
iterate over the dimensions defined in the config to create each field transformation on the
stream_template for each custom report.
If we were able to, the actual ComponentMappingDefinition would look like this:
type: ComponentMappingDefinition
field_path:
- transformations
- "1"
- fields
value: "{{ [{'path': [dimension], 'value': '{{ record['keys'][index]} for index, dimension in enumerate(components_values['dimensions'])] }}"
or
type: ComponentMappingDefinition
field_path:
- transformations
- "1"
- fields
value: >
{% for index, dimension in enumerate(components_values["dimensions"]) %}
- type: AddFields
fields:
- path: [ {{ dimension }} ]
value: "{{ record['keys'][index] }}"
{% endfor %}
"""
dimensions: List[str] = field(default_factory=lambda: [])
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for dimension in self.dimensions:
record[dimension] = record["keys"].pop(0)
record.pop("keys")
@dataclass
class CustomReportSchemaLoader(SchemaLoader):
"""
Custom schema loader is needed because Google Search Console's custom reports streams
because the schema is dependent on which dimensions are selected in the config. Right now,
only DynamicSchemaLoader which is based on the response from an API endpoint supports
remapping of types to Airbyte schema types. This CustomReportSchemaLoader functions
more like a static schema loader and so we must perform the remapping in this custom component.
"""
DIMENSION_TO_PROPERTY_SCHEMA_MAP = {
"country": [{"country": {"type": ["null", "string"]}}],
"date": [{"date": {"type": ["null", "string"], "format": "date"}}],
"device": [{"device": {"type": ["null", "string"]}}],
"page": [{"page": {"type": ["null", "string"]}}],
"query": [{"query": {"type": ["null", "string"]}}],
}
dimensions: List[str]
def get_json_schema(self) -> Mapping[str, Any]:
schema: Mapping[str, Any] = {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": True,
"properties": {
# metrics
"clicks": {"type": ["null", "integer"]},
"ctr": {"type": ["null", "number"], "multipleOf": 1e-25},
"impressions": {"type": ["null", "integer"]},
"position": {"type": ["null", "number"], "multipleOf": 1e-25},
# default fields
"search_type": {"type": ["null", "string"]},
"site_url": {"type": ["null", "string"]},
},
}
# dimensions
dimension_properties = self._dimension_to_property_schema()
schema["properties"].update(dimension_properties)
return schema
def _dimension_to_property_schema(self) -> dict:
properties = {}
for dimension in sorted(self.dimensions):
fields = self.DIMENSION_TO_PROPERTY_SCHEMA_MAP[dimension]
for field in fields:
properties = {**properties, **field}
return properties