882 lines
40 KiB
Python
882 lines
40 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import logging
|
|
from dataclasses import InitVar, dataclass, field
|
|
from datetime import timedelta
|
|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
|
|
|
|
import dpath
|
|
import requests
|
|
|
|
from airbyte_cdk import (
|
|
BearerAuthenticator,
|
|
DpathExtractor,
|
|
RecordSelector,
|
|
SimpleRetriever,
|
|
)
|
|
from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
|
|
from airbyte_cdk.sources.declarative.auth.selective_authenticator import SelectiveAuthenticator
|
|
from airbyte_cdk.sources.declarative.auth.token_provider import InterpolatedStringTokenProvider
|
|
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
|
|
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
|
|
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
|
|
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter
|
|
from airbyte_cdk.sources.declarative.requesters import HttpRequester
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
|
|
ExponentialBackoffStrategy,
|
|
WaitTimeFromHeaderBackoffStrategy,
|
|
)
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
|
|
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
|
|
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
|
|
from airbyte_cdk.sources.declarative.requesters.requester import Requester
|
|
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
|
|
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
|
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_format, ab_datetime_now, ab_datetime_parse
|
|
|
|
|
|
logger = logging.getLogger("airbyte")
|
|
|
|
|
|
@dataclass
|
|
class NewtoLegacyFieldTransformation(RecordTransformation):
|
|
"""
|
|
Implements a custom transformation which adds the legacy field equivalent of v2 fields for streams which contain Deals and Contacts entities.
|
|
|
|
This custom implmentation was developed in lieu of the AddFields component due to the dynamic-nature of the record properties for the HubSpot source. Each
|
|
|
|
For example:
|
|
hs_v2_date_exited_{stage_id} -> hs_date_exited_{stage_id} where {stage_id} is a user-generated value
|
|
"""
|
|
|
|
field_mapping: Mapping[str, str]
|
|
|
|
def transform(
|
|
self,
|
|
record_or_schema: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
"""
|
|
Transform a record in place by adding fields directly to the record by manipulating the injected fields into a legacy field to avoid breaking syncs.
|
|
|
|
:param record_or_schema: The input record or schema to be transformed.
|
|
"""
|
|
is_record = record_or_schema.get("properties") is not None
|
|
|
|
for field, value in list(record_or_schema.get("properties", record_or_schema).items()):
|
|
for legacy_field, new_field in self.field_mapping.items():
|
|
if new_field in field:
|
|
transformed_field = field.replace(new_field, legacy_field)
|
|
|
|
if legacy_field == "hs_lifecyclestage_" and not transformed_field.endswith("_date"):
|
|
transformed_field += "_date"
|
|
|
|
if is_record:
|
|
if record_or_schema["properties"].get(transformed_field) is None:
|
|
record_or_schema["properties"][transformed_field] = value
|
|
else:
|
|
if record_or_schema.get(transformed_field) is None:
|
|
record_or_schema[transformed_field] = value
|
|
|
|
|
|
class MigrateEmptyStringState(StateMigration):
|
|
cursor_field: str
|
|
config: Config
|
|
cursor_format: Optional[str] = None
|
|
|
|
def __init__(self, cursor_field, config: Config, cursor_format: Optional[str] = None):
|
|
self.cursor_field = cursor_field
|
|
self.cursor_format = cursor_format
|
|
self.config = config
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
# if start date wasn't provided in the config default date will be used
|
|
start_date = self.config.get("start_date", "2006-06-01T00:00:00.000Z")
|
|
if self.cursor_format:
|
|
dt = ab_datetime_parse(start_date)
|
|
formatted_start_date = DatetimeParser().format(dt, self.cursor_format)
|
|
return {self.cursor_field: formatted_start_date}
|
|
|
|
return {self.cursor_field: start_date}
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
return stream_state.get(self.cursor_field) == ""
|
|
|
|
|
|
@dataclass
|
|
class HubspotPropertyHistoryExtractor(RecordExtractor):
|
|
"""
|
|
Custom record extractor which parses the JSON response from Hubspot and for each instance returned for the specified
|
|
object type (ex. Contacts, Deals, etc.), yields records for every requested property. Because this is a property
|
|
history stream, an individual property can yield multiple records representing the previous version of that property.
|
|
|
|
The custom behavior of this component is:
|
|
- Iterating over and extracting property history instances as individual records
|
|
- Injecting fields from out levels of the response into yielded records to be used as primary keys
|
|
"""
|
|
|
|
field_path: List[Union[InterpolatedString, str]]
|
|
entity_primary_key: str
|
|
additional_keys: Optional[List[str]]
|
|
config: Config
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
|
|
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
|
|
for path_index in range(len(self.field_path)):
|
|
if isinstance(self.field_path[path_index], str):
|
|
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
|
|
|
|
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
|
|
for body in self.decoder.decode(response):
|
|
results = []
|
|
if len(self._field_path) == 0:
|
|
extracted = body
|
|
else:
|
|
path = [path.eval(self.config) for path in self._field_path]
|
|
if "*" in path:
|
|
extracted = dpath.values(body, path)
|
|
else:
|
|
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
|
|
if isinstance(extracted, list):
|
|
results = extracted
|
|
elif extracted:
|
|
raise ValueError(f"field_path should always point towards a list field in the response body for property_history streams")
|
|
|
|
for result in results:
|
|
properties_with_history = result.get("propertiesWithHistory")
|
|
primary_key = result.get("id")
|
|
additional_keys = (
|
|
{additional_key: result.get(additional_key) for additional_key in self.additional_keys} if self.additional_keys else {}
|
|
)
|
|
|
|
if properties_with_history:
|
|
for property_name, value_dict in properties_with_history.items():
|
|
if property_name == "hs_lastmodifieddate":
|
|
# Skipping the lastmodifieddate since it only returns the value
|
|
# when one field of a record was changed no matter which
|
|
# field was changed. It therefore creates overhead, since for
|
|
# every changed property there will be the date it was changed in itself
|
|
# and a change in the lastmodifieddate field.
|
|
continue
|
|
for version in value_dict:
|
|
version["property"] = property_name
|
|
version[self.entity_primary_key] = primary_key
|
|
yield version | additional_keys
|
|
|
|
|
|
@dataclass
|
|
class AddFieldsFromEndpointTransformation(RecordTransformation):
|
|
"""
|
|
Makes request to provided endpoint and updates record with retrieved data.
|
|
|
|
requester: Requester
|
|
record_selector: HttpSelector
|
|
"""
|
|
|
|
requester: Requester
|
|
record_selector: HttpSelector
|
|
|
|
def transform(
|
|
self,
|
|
record: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
additional_data_response = self.requester.send_request(
|
|
stream_slice=StreamSlice(partition={"parent_id": record["id"]}, cursor_slice={})
|
|
)
|
|
additional_data = self.record_selector.select_records(response=additional_data_response, stream_state={}, records_schema={})
|
|
|
|
for data in additional_data:
|
|
record.update(data)
|
|
|
|
|
|
@dataclass
|
|
class MarketingEmailStatisticsTransformation(RecordTransformation):
|
|
"""
|
|
Custom transformation for HubSpot Marketing Emails that fetches statistics from the v3 API.
|
|
|
|
This transformation is needed because the v3 API separates email data and statistics into two endpoints:
|
|
- GET /marketing/v3/emails - for email data
|
|
- GET /marketing/v3/emails/{emailId}/statistics - for statistics
|
|
|
|
This transformation fetches statistics for each email and merges them into the email record.
|
|
"""
|
|
|
|
requester: Requester
|
|
record_selector: HttpSelector
|
|
|
|
def transform(
|
|
self,
|
|
record: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
try:
|
|
# Fetch statistics for this email using the v3 statistics endpoint
|
|
statistics_response = self.requester.send_request(
|
|
stream_slice=StreamSlice(partition={"email_id": record["id"]}, cursor_slice={})
|
|
)
|
|
statistics_data = self.record_selector.select_records(response=statistics_response, stream_state={}, records_schema={})
|
|
|
|
# Merge statistics into the email record
|
|
for stats in statistics_data:
|
|
record.update(stats)
|
|
|
|
except Exception as e:
|
|
# Log the error but don't fail the entire sync
|
|
# This ensures that if statistics are unavailable for some emails,
|
|
# we still get the email data
|
|
logger.warning(f"Failed to fetch statistics for email {record.get('id', 'unknown')}: {str(e)}")
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class HubspotSchemaExtractor(RecordExtractor):
|
|
"""
|
|
Transformation that encapsulates the list of properties under a single object because DynamicSchemaLoader only
|
|
accepts the set of dynamic schema fields as a single record.
|
|
This might be doable with the existing DpathExtractor configuration.
|
|
"""
|
|
|
|
config: Config
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
|
|
|
|
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
|
|
yield {"properties": list(self.decoder.decode(response))}
|
|
|
|
|
|
@dataclass
|
|
class HubspotRenamePropertiesTransformation(RecordTransformation):
|
|
"""
|
|
Custom transformation that takes in a record that represents a map of all dynamic properties retrieved
|
|
from the Hubspot properties endpoint. This mapping nests all of these fields under a sub-object called
|
|
`properties` and updates all the property field names at the top level to be prefixed with
|
|
`properties_<property_name>`.
|
|
"""
|
|
|
|
def transform(
|
|
self,
|
|
record: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
transformed_record = {
|
|
"properties": {
|
|
"type": "object",
|
|
"properties": {},
|
|
}
|
|
}
|
|
for key, value in record.items():
|
|
transformed_record["properties"]["properties"][key] = value
|
|
updated_key = f"properties_{key}"
|
|
transformed_record[updated_key] = value
|
|
|
|
record.clear()
|
|
record.update(transformed_record)
|
|
|
|
|
|
class EngagementsHttpRequester(HttpRequester):
|
|
"""
|
|
Engagements stream uses different endpoints:
|
|
- Engagements Recent if start_date/state is less than 30 days and API is able to return all records (<10k), or
|
|
- Engagements All which extracts all records, but supports filter on connector side
|
|
|
|
Recent Engagements API:
|
|
https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
|
|
|
|
Important: This endpoint returns only last 10k most recently updated records in the last 30 days.
|
|
|
|
All Engagements API:
|
|
https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements
|
|
|
|
Important:
|
|
|
|
1. The stream is declared to use one stream slice from start date(default/config/state) to time.now(). It doesn't have step.
|
|
Based on this we can use stream_slice["start_time"] and be sure that this is equal to value in initial state.
|
|
Stream Slice [start_time] is used to define _use_recent_api, concurrent processing of date windows is incompatible and therefore does not support using a step
|
|
2.The stream is declared to use 250 as page size param in pagination.
|
|
Recent Engagements API have 100 as max param but doesn't fail is bigger value was provided and returns to 100 as default.
|
|
3. The stream has is_client_side_incremental=true to filter Engagements All response.
|
|
"""
|
|
|
|
recent_api_total_records_limit = 10000
|
|
recent_api_last_days_limit = 29
|
|
|
|
recent_api_path = "/engagements/v1/engagements/recent/modified"
|
|
all_api_path = "/engagements/v1/engagements/paged"
|
|
|
|
_use_recent_api = None
|
|
|
|
def should_use_recent_api(self, stream_slice: StreamSlice) -> bool:
|
|
if self._use_recent_api is not None:
|
|
return self._use_recent_api
|
|
|
|
# Recent engagements API returns records updated in the last 30 days only. If start time is older All engagements API should be used
|
|
if int(stream_slice["start_time"]) >= int(
|
|
DatetimeParser().format((ab_datetime_now() - timedelta(days=self.recent_api_last_days_limit)), "%ms")
|
|
):
|
|
# Recent engagements API returns only 10k most recently updated records.
|
|
# API response indicates that there are more records so All engagements API should be used
|
|
_, response = self._http_client.send_request(
|
|
http_method=self.get_method().value,
|
|
url=self._join_url(self.get_url_base(), self.recent_api_path),
|
|
headers=self._request_headers({}, stream_slice, {}, {}),
|
|
params={"count": 250, "since": stream_slice["start_time"]},
|
|
request_kwargs={"stream": self.stream_response},
|
|
)
|
|
if response.json().get("total") <= self.recent_api_total_records_limit:
|
|
self._use_recent_api = True
|
|
else:
|
|
self._use_recent_api = False
|
|
|
|
return self._use_recent_api
|
|
|
|
def get_path(
|
|
self,
|
|
*,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
next_page_token: Optional[Mapping[str, Any]] = None,
|
|
) -> str:
|
|
if self.should_use_recent_api(stream_slice):
|
|
return self.recent_api_path
|
|
return self.all_api_path
|
|
|
|
def get_request_params(
|
|
self,
|
|
*,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
next_page_token: Optional[Mapping[str, Any]] = None,
|
|
) -> MutableMapping[str, Any]:
|
|
request_params = self._request_options_provider.get_request_params(
|
|
stream_state=stream_state,
|
|
stream_slice=stream_slice,
|
|
next_page_token=next_page_token,
|
|
)
|
|
if self.should_use_recent_api(stream_slice):
|
|
request_params.update({"since": stream_slice["start_time"]})
|
|
return request_params
|
|
|
|
|
|
class EntitySchemaNormalization(TypeTransformer):
|
|
"""
|
|
For CRM object and CRM Search streams, which have dynamic schemas, custom normalization should be applied.
|
|
Convert record's received value according to its declared catalog dynamic schema type and format.
|
|
|
|
Empty strings for fields that have non string type converts to None.
|
|
Numeric strings for fields that have number type converts to integer type, otherwise to number.
|
|
Strings like "true"/"false" with boolean type converts to boolean.
|
|
Date and Datime fields converts to format datetime string. Set __ab_apply_cast_datetime: false in field definition, if you don't need to format datetime strings.
|
|
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
config = TransformConfig.CustomSchemaNormalization
|
|
super().__init__(config)
|
|
self.registerCustomTransform(self.get_transform_function())
|
|
|
|
def get_transform_function(self):
|
|
def transform_function(original_value: str, field_schema: Dict[str, Any]) -> Any:
|
|
target_type = field_schema.get("type")
|
|
target_format = field_schema.get("format")
|
|
|
|
if "null" in target_type:
|
|
if original_value is None:
|
|
return original_value
|
|
# Sometimes hubspot output empty string on field with format set.
|
|
# Set it to null to avoid errors on destination' normalization stage.
|
|
if target_format and original_value == "":
|
|
return None
|
|
|
|
if isinstance(original_value, str):
|
|
if "string" not in target_type and original_value == "":
|
|
# do not cast empty strings, return None instead to be properly cast.
|
|
transformed_value = None
|
|
return transformed_value
|
|
if "number" in target_type:
|
|
# do not cast numeric IDs into float, use integer instead
|
|
target_type = int if original_value.isnumeric() else float
|
|
|
|
# In some cases, the returned value from Hubspot is non-numeric despite the discovered schema explicitly declaring a numeric type.
|
|
# For example, a field with a type of "number" might return a string: "3092727991;3881228353;15895321999"
|
|
# So, we attempt to cast the value to the declared type, and failing that, we log the error and return the original value.
|
|
# This matches the previous behavior in the Python implementation.
|
|
try:
|
|
transformed_value = target_type(original_value.replace(",", ""))
|
|
return transformed_value
|
|
except ValueError:
|
|
logger.exception(f"Could not cast field value {original_value} to {target_type}")
|
|
return original_value
|
|
if "boolean" in target_type and original_value.lower() in ["true", "false"]:
|
|
transformed_value = str(original_value).lower() == "true"
|
|
return transformed_value
|
|
if target_format:
|
|
if field_schema.get("__ab_apply_cast_datetime") is False:
|
|
return original_value
|
|
if "date" == target_format:
|
|
dt = EntitySchemaNormalization.convert_datetime_string_to_ab_datetime(original_value)
|
|
if dt:
|
|
transformed_value = DatetimeParser().format(dt, "%Y-%m-%d")
|
|
return transformed_value
|
|
else:
|
|
return original_value
|
|
if "date-time" == target_format:
|
|
dt = EntitySchemaNormalization.convert_datetime_string_to_ab_datetime(original_value)
|
|
if dt:
|
|
transformed_value = ab_datetime_format(dt)
|
|
return transformed_value
|
|
else:
|
|
return original_value
|
|
if "properties" in field_schema and isinstance(original_value, dict):
|
|
normalized_nested_properties = dict()
|
|
for nested_key, nested_val in original_value.items():
|
|
nested_property_schema = field_schema.get("properties").get(nested_key)
|
|
if nested_property_schema:
|
|
normalized_nested_properties[nested_key] = transform_function(nested_val, nested_property_schema)
|
|
else:
|
|
normalized_nested_properties[nested_key] = nested_val
|
|
return normalized_nested_properties
|
|
else:
|
|
return self.default_convert(original_value, field_schema)
|
|
|
|
return transform_function
|
|
|
|
@staticmethod
|
|
def convert_datetime_string_to_ab_datetime(datetime_str: str) -> Optional[AirbyteDateTime]:
|
|
"""
|
|
Implements the existing source-hubspot behavior where the API response can return either a timestamp
|
|
with seconds or milliseconds precision. We first attempt to parse in seconds, then millisecond, or
|
|
if unparsable we log a warning and emit the original value. Returns None if the string could not
|
|
be parsed into a datetime object because the existing source emits the original value and logs warning.
|
|
"""
|
|
if not datetime_str:
|
|
return None
|
|
|
|
# Hubspot sometimes returns datetime strings as a float which can cause an OverflowError. When a float
|
|
# string is detected, the string is converted into an integer string before parsing
|
|
try:
|
|
float(datetime_str)
|
|
if "." in datetime_str:
|
|
datetime_str = datetime_str.split(".")[0]
|
|
except ValueError:
|
|
pass
|
|
|
|
try:
|
|
return ab_datetime_parse(datetime_str)
|
|
except (ValueError, TypeError, OverflowError) as ex:
|
|
pass
|
|
|
|
try:
|
|
return ab_datetime_parse(int(datetime_str) // 1000)
|
|
except (ValueError, TypeError, OverflowError) as ex:
|
|
logger.warning(f"Couldn't parse date/datetime string field. Timestamp field value: {datetime_str}. Ex: {ex}")
|
|
|
|
return None
|
|
|
|
|
|
class HubspotFlattenAssociationsTransformation(RecordTransformation):
|
|
"""
|
|
A record transformation that flattens the `associations` field in HubSpot records.
|
|
This transformation takes a nested dictionary under the `associations` key and extracts the IDs
|
|
of associated objects. The extracted lists of IDs are added as new top-level fields in the record,
|
|
using the association name as the key (spaces replaced with underscores).
|
|
Example:
|
|
Input:
|
|
{
|
|
"id": 1,
|
|
"associations": {
|
|
"Contacts": {"results": [{"id": 101}, {"id": 102}]}
|
|
}
|
|
}
|
|
Output:
|
|
{
|
|
"id": 1,
|
|
"Contacts": [101, 102]
|
|
}
|
|
"""
|
|
|
|
def transform(
|
|
self,
|
|
record: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
if "associations" in record:
|
|
associations = record.pop("associations")
|
|
for name, association in associations.items():
|
|
record[name.replace(" ", "_")] = [row["id"] for row in association.get("results", [])]
|
|
|
|
|
|
@dataclass
|
|
class HubspotAssociationsExtractor(RecordExtractor):
|
|
"""
|
|
Custom extractor for HubSpot association-enriched records.
|
|
This extractor:
|
|
- Navigates a specified `field_path` within the JSON response to extract a list of primary entities.
|
|
- Gets records IDs to use in associations retriever body.
|
|
- Uses a secondary retriever to fetch associated objects for each entity (based on provided `associations_list`).
|
|
- Merges associated object IDs back into each entity's record under the corresponding association name.
|
|
Attributes:
|
|
field_path: Path to the list of records in the API response.
|
|
entity: The field used for associations retriever endpoint.
|
|
associations_list: List of associations to fetch (e.g., ["contacts", "companies"]).
|
|
"""
|
|
|
|
field_path: List[Union[InterpolatedString, str]]
|
|
entity: Union[InterpolatedString, str]
|
|
associations_list: Union[List[str], Union[InterpolatedString, str]]
|
|
config: Config
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
|
|
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
|
|
for path_index in range(len(self.field_path)):
|
|
if isinstance(self.field_path[path_index], str):
|
|
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
|
|
|
|
self._entity = InterpolatedString.create(self.entity, parameters=parameters)
|
|
|
|
# The list of associations can either be provided as a static list of constants or evaluated from an interpolated string
|
|
if isinstance(self.associations_list, list):
|
|
self._associations_list = self.associations_list
|
|
else:
|
|
self._associations_list = InterpolatedString.create(self.associations_list, parameters=parameters)
|
|
|
|
self._associations_retriever = build_associations_retriever(
|
|
associations_list=self._associations_list,
|
|
parent_entity=self._entity,
|
|
config=self.config,
|
|
)
|
|
|
|
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
|
|
for body in self.decoder.decode(response):
|
|
if len(self._field_path) == 0:
|
|
extracted = body
|
|
else:
|
|
path = [path.eval(self.config) for path in self._field_path]
|
|
if "*" in path:
|
|
extracted = dpath.values(body, path)
|
|
else:
|
|
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
|
|
if isinstance(extracted, list):
|
|
records = extracted
|
|
elif extracted:
|
|
raise ValueError(f"field_path should always point towards a list field in the response body")
|
|
|
|
# If no records were extracted, no need to call the associations retriever
|
|
if not records:
|
|
continue
|
|
|
|
records_by_pk = {record["id"]: record for record in records}
|
|
record_ids = [{"id": record["id"]} for record in records]
|
|
|
|
slices = self._associations_retriever.stream_slicer.stream_slices()
|
|
|
|
for _slice in slices:
|
|
# Append the list of extracted records so they are usable during interpolation of the JSON request body
|
|
stream_slice = StreamSlice(
|
|
cursor_slice=_slice.cursor_slice, partition=_slice.partition, extra_fields={"record_ids": record_ids}
|
|
)
|
|
logger.debug(f"Reading {_slice} associations of {self._entity.eval(config=self.config)}")
|
|
associations = self._associations_retriever.read_records({}, stream_slice=stream_slice)
|
|
for group in associations:
|
|
slice_value = stream_slice["association_name"]
|
|
current_record = records_by_pk[group["from"]["id"]]
|
|
associations_list = current_record.get(slice_value, [])
|
|
associations_list.extend(association["toObjectId"] for association in group["to"])
|
|
# Associations are defined in the schema as string ids but come in the API response as integer ids
|
|
current_record[slice_value] = [str(association) for association in associations_list]
|
|
yield from records_by_pk.values()
|
|
|
|
|
|
def build_associations_retriever(
|
|
*,
|
|
associations_list: Union[List[str], InterpolatedString],
|
|
parent_entity: InterpolatedString,
|
|
config: Config,
|
|
) -> SimpleRetriever:
|
|
"""
|
|
Instantiates a SimpleRetriever that makes requests against:
|
|
POST /crm/v4/associations/{self.parent_entity}/{stream_slice.association}/batch/read
|
|
|
|
The current architecture of the low-code framework makes it difficult to instantiate components
|
|
in arbitrary locations within the manifest.yaml. For example, the only place where a SimpleRetriever
|
|
can be instantiated is as a field of DeclarativeStream because the `model_to_component_factory.py.create_simple_retriever()`
|
|
constructor takes incoming parameters from values of the DeclarativeStream.
|
|
|
|
So we are unable to build the associations_retriever, from within this custom HubspotAssociationsExtractor
|
|
because we will be missing required parameters that are not supplied by the SimpleRetrieverModel.
|
|
And we're left with the workaround of building the runtime components in this method.
|
|
"""
|
|
|
|
parameters: Mapping[str, Any] = {}
|
|
evaluated_entity = parent_entity.eval(config=config)
|
|
|
|
if isinstance(associations_list, InterpolatedString):
|
|
associations = associations_list.eval(config=config)
|
|
else:
|
|
associations = associations_list
|
|
|
|
bearer_authenticator = BearerAuthenticator(
|
|
token_provider=InterpolatedStringTokenProvider(
|
|
api_token=config.get("credentials", {}).get("access_token", ""),
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
config=config,
|
|
parameters=parameters,
|
|
)
|
|
|
|
# Use default values to create a component if another authentication method is used.
|
|
# If values are missing it will fail in the parent stream
|
|
oauth_authenticator = DeclarativeOauth2Authenticator(
|
|
config=config,
|
|
parameters=parameters,
|
|
client_id=config.get("credentials", {}).get("client_id", "client_id"),
|
|
client_secret=config.get("credentials", {}).get("client_secret", "client_secret"),
|
|
refresh_token=config.get("credentials", {}).get("refresh_token", "refresh_token"),
|
|
token_refresh_endpoint="https://api.hubapi.com/oauth/v1/token",
|
|
)
|
|
|
|
authenticator = SelectiveAuthenticator(
|
|
config,
|
|
authenticators={"Private App Credentials": bearer_authenticator, "OAuth Credentials": oauth_authenticator},
|
|
authenticator_selection_path=["credentials", "credentials_title"],
|
|
)
|
|
|
|
requester = HttpRequester(
|
|
name="associations",
|
|
url_base="https://api.hubapi.com",
|
|
path=f"/crm/v4/associations/{evaluated_entity}/" + "{{ stream_partition['association_name'] }}/batch/read",
|
|
http_method="POST",
|
|
authenticator=authenticator,
|
|
request_options_provider=InterpolatedRequestOptionsProvider(
|
|
request_body_json={"inputs": "{{ stream_slice.extra_fields['record_ids'] }}"},
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
error_handler=DefaultErrorHandler(
|
|
backoff_strategies=[
|
|
WaitTimeFromHeaderBackoffStrategy(header="Retry-After", config=config, parameters=parameters),
|
|
ExponentialBackoffStrategy(config=config, parameters=parameters),
|
|
],
|
|
response_filters=[
|
|
HttpResponseFilter(
|
|
action="RETRY",
|
|
http_codes={429},
|
|
error_message="HubSpot rate limit reached (429). Backoff based on 'Retry-After' header, then exponential backoff fallback.",
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
HttpResponseFilter(
|
|
action="RETRY",
|
|
http_codes={502, 503},
|
|
error_message="HubSpot server error (5xx). Retrying with exponential backoff...",
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
HttpResponseFilter(
|
|
action="RETRY",
|
|
http_codes={401},
|
|
error_message="Authentication to HubSpot has expired. Authentication will be retried, but if this issue persists, re-authenticate to restore access to HubSpot.",
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
HttpResponseFilter(
|
|
action="FAIL",
|
|
http_codes={530},
|
|
error_message="The user cannot be authorized with provided credentials. Please verify that your credentials are valid and try again.",
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
HttpResponseFilter(
|
|
action="FAIL",
|
|
http_codes={403},
|
|
error_message="Access denied (403). The authenticated user does not have permissions to access the resource.",
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
HttpResponseFilter(
|
|
action="FAIL",
|
|
http_codes={400},
|
|
error_message="Bad request (400). Please verify your credentials and try again.",
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
],
|
|
config=config,
|
|
parameters=parameters,
|
|
),
|
|
config=config,
|
|
parameters=parameters,
|
|
)
|
|
|
|
# Slice over IDs emitted by the parent stream
|
|
slicer = ListPartitionRouter(values=associations, cursor_field="association_name", config=config, parameters=parameters)
|
|
|
|
selector = RecordSelector(
|
|
extractor=DpathExtractor(field_path=["results"], config=config, parameters=parameters),
|
|
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
|
|
record_filter=None,
|
|
transformations=[],
|
|
config=config,
|
|
parameters=parameters,
|
|
)
|
|
|
|
return SimpleRetriever(
|
|
name="associations",
|
|
primary_key=None,
|
|
requester=requester,
|
|
record_selector=selector,
|
|
paginator=None, # batch/read never paginates
|
|
stream_slicer=slicer,
|
|
config=config,
|
|
parameters=parameters,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class HubspotCRMSearchPaginationStrategy(PaginationStrategy):
|
|
"""
|
|
This pagination strategy functioning similarly to the default cursor pagination strategy. The custom
|
|
behavior accounts for Hubspot's /search API limitation that only allows for a max of 10,000 total results
|
|
for a query. Once we reach 10,000 records, we start a new query using the latest id collected.
|
|
"""
|
|
|
|
page_size: int
|
|
primary_key: str = "id"
|
|
RECORDS_LIMIT = 10000
|
|
|
|
@property
|
|
def initial_token(self) -> Optional[Any]:
|
|
return {"after": 0}
|
|
|
|
def next_page_token(
|
|
self,
|
|
response: requests.Response,
|
|
last_page_size: int,
|
|
last_record: Optional[Record],
|
|
last_page_token_value: Optional[Any] = None,
|
|
) -> Optional[Any]:
|
|
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
|
|
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
|
|
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
|
|
# start a new search query with the latest id that has been collected.
|
|
if last_page_token_value and last_page_token_value.get("after", 0) + last_page_size >= self.RECORDS_LIMIT:
|
|
return {"after": 0, "id": int(last_record[self.primary_key]) + 1}
|
|
|
|
# Stop paginating when there are fewer records than the page size or the current page has no records
|
|
if (last_page_size < self.page_size) or last_page_size == 0 or not response.json().get("paging"):
|
|
return None
|
|
|
|
last_id_of_previous_chunk = last_page_token_value.get("id")
|
|
if last_id_of_previous_chunk:
|
|
return {"after": last_page_token_value["after"] + last_page_size, self.primary_key: last_id_of_previous_chunk}
|
|
else:
|
|
return {"after": last_page_token_value["after"] + last_page_size}
|
|
|
|
def get_page_size(self) -> Optional[int]:
|
|
return self.page_size
|
|
|
|
|
|
@dataclass
|
|
class HubspotCustomObjectsSchemaLoader(SchemaLoader):
|
|
"""
|
|
Custom schema loader for HubSpot custom object streams.
|
|
|
|
This class generates a JSON schema based on the properties defined in the manifest.
|
|
These properties are injected into the parameters by the HttpComponentsResolver used within the DynamicDeclarativeStream.
|
|
"""
|
|
|
|
config: Mapping[str, Any]
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
|
|
raw_schema_properties: List[Mapping[str, Any]] = parameters.get("schema_properties", {})
|
|
properties = self._get_properties(raw_schema=raw_schema_properties)
|
|
self._schema = self._generate_schema(properties)
|
|
|
|
def _get_properties(self, raw_schema: List[Mapping[str, Any]]) -> Mapping[str, Any]:
|
|
return {field["name"]: self._field_to_property_schema(field) for field in raw_schema}
|
|
|
|
def _field_to_property_schema(self, field: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
field_type = field["type"]
|
|
|
|
if field_type in ["string", "enumeration", "phone_number", "object_coordinates", "json"]:
|
|
return {"type": ["null", "string"]}
|
|
elif field_type == "datetime" or field_type == "date-time":
|
|
return {"type": ["null", "string"], "format": "date-time"}
|
|
elif field_type == "date":
|
|
return {"type": ["null", "string"], "format": "date"}
|
|
elif field_type == "number":
|
|
return {"type": ["null", "number"]}
|
|
elif field_type == "boolean" or field_type == "bool":
|
|
return {"type": ["null", "boolean"]}
|
|
else:
|
|
logger.warn(f"Field {field['name']} has unrecognized type: {field['type']} casting to string.")
|
|
return {"type": ["null", "string"]}
|
|
|
|
def _generate_schema(self, properties: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
unnested_properties = {f"properties_{property_name}": property_value for (property_name, property_value) in properties.items()}
|
|
schema = {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": ["null", "object"],
|
|
"additionalProperties": True,
|
|
"properties": {
|
|
"id": {"type": ["null", "string"]},
|
|
"createdAt": {"type": ["null", "string"], "format": "date-time"},
|
|
"updatedAt": {"type": ["null", "string"], "format": "date-time"},
|
|
"archived": {"type": ["null", "boolean"]},
|
|
"properties": {"type": ["null", "object"], "properties": properties},
|
|
**unnested_properties,
|
|
},
|
|
}
|
|
|
|
return schema
|
|
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
return self._schema
|
|
|
|
|
|
_TRUTHY_STRINGS = ("y", "yes", "t", "true", "on", "1")
|
|
_FALSEY_STRINGS = ("n", "no", "f", "false", "off", "0")
|
|
|
|
|
|
def _strtobool(value: str, /) -> int:
|
|
"""Mimic the behavior of distutils.util.strtobool.
|
|
|
|
From: https://docs.python.org/2/distutils/apiref.html#distutils.util.strtobool
|
|
|
|
> Convert a string representation of truth to true (1) or false (0).
|
|
> True values are y, yes, t, true, on and 1; false values are n, no, f, false, off and 0. Raises
|
|
> `ValueError` if val is anything else.
|
|
"""
|
|
normalized_str = value.lower().strip()
|
|
if normalized_str in _TRUTHY_STRINGS:
|
|
return 1
|
|
|
|
if normalized_str in _FALSEY_STRINGS:
|
|
return 0
|
|
|
|
raise ValueError(f"Invalid boolean value: {normalized_str}")
|