1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-hubspot/components.py

882 lines
40 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import logging
from dataclasses import InitVar, dataclass, field
from datetime import timedelta
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
import dpath
import requests
from airbyte_cdk import (
BearerAuthenticator,
DpathExtractor,
RecordSelector,
SimpleRetriever,
)
from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
from airbyte_cdk.sources.declarative.auth.selective_authenticator import SelectiveAuthenticator
from airbyte_cdk.sources.declarative.auth.token_provider import InterpolatedStringTokenProvider
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
ExponentialBackoffStrategy,
WaitTimeFromHeaderBackoffStrategy,
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_format, ab_datetime_now, ab_datetime_parse
logger = logging.getLogger("airbyte")
@dataclass
class NewtoLegacyFieldTransformation(RecordTransformation):
"""
Implements a custom transformation which adds the legacy field equivalent of v2 fields for streams which contain Deals and Contacts entities.
This custom implmentation was developed in lieu of the AddFields component due to the dynamic-nature of the record properties for the HubSpot source. Each
For example:
hs_v2_date_exited_{stage_id} -> hs_date_exited_{stage_id} where {stage_id} is a user-generated value
"""
field_mapping: Mapping[str, str]
def transform(
self,
record_or_schema: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
"""
Transform a record in place by adding fields directly to the record by manipulating the injected fields into a legacy field to avoid breaking syncs.
:param record_or_schema: The input record or schema to be transformed.
"""
is_record = record_or_schema.get("properties") is not None
for field, value in list(record_or_schema.get("properties", record_or_schema).items()):
for legacy_field, new_field in self.field_mapping.items():
if new_field in field:
transformed_field = field.replace(new_field, legacy_field)
if legacy_field == "hs_lifecyclestage_" and not transformed_field.endswith("_date"):
transformed_field += "_date"
if is_record:
if record_or_schema["properties"].get(transformed_field) is None:
record_or_schema["properties"][transformed_field] = value
else:
if record_or_schema.get(transformed_field) is None:
record_or_schema[transformed_field] = value
class MigrateEmptyStringState(StateMigration):
cursor_field: str
config: Config
cursor_format: Optional[str] = None
def __init__(self, cursor_field, config: Config, cursor_format: Optional[str] = None):
self.cursor_field = cursor_field
self.cursor_format = cursor_format
self.config = config
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
# if start date wasn't provided in the config default date will be used
start_date = self.config.get("start_date", "2006-06-01T00:00:00.000Z")
if self.cursor_format:
dt = ab_datetime_parse(start_date)
formatted_start_date = DatetimeParser().format(dt, self.cursor_format)
return {self.cursor_field: formatted_start_date}
return {self.cursor_field: start_date}
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return stream_state.get(self.cursor_field) == ""
@dataclass
class HubspotPropertyHistoryExtractor(RecordExtractor):
"""
Custom record extractor which parses the JSON response from Hubspot and for each instance returned for the specified
object type (ex. Contacts, Deals, etc.), yields records for every requested property. Because this is a property
history stream, an individual property can yield multiple records representing the previous version of that property.
The custom behavior of this component is:
- Iterating over and extracting property history instances as individual records
- Injecting fields from out levels of the response into yielded records to be used as primary keys
"""
field_path: List[Union[InterpolatedString, str]]
entity_primary_key: str
additional_keys: Optional[List[str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
for body in self.decoder.decode(response):
results = []
if len(self._field_path) == 0:
extracted = body
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
results = extracted
elif extracted:
raise ValueError(f"field_path should always point towards a list field in the response body for property_history streams")
for result in results:
properties_with_history = result.get("propertiesWithHistory")
primary_key = result.get("id")
additional_keys = (
{additional_key: result.get(additional_key) for additional_key in self.additional_keys} if self.additional_keys else {}
)
if properties_with_history:
for property_name, value_dict in properties_with_history.items():
if property_name == "hs_lastmodifieddate":
# Skipping the lastmodifieddate since it only returns the value
# when one field of a record was changed no matter which
# field was changed. It therefore creates overhead, since for
# every changed property there will be the date it was changed in itself
# and a change in the lastmodifieddate field.
continue
for version in value_dict:
version["property"] = property_name
version[self.entity_primary_key] = primary_key
yield version | additional_keys
@dataclass
class AddFieldsFromEndpointTransformation(RecordTransformation):
"""
Makes request to provided endpoint and updates record with retrieved data.
requester: Requester
record_selector: HttpSelector
"""
requester: Requester
record_selector: HttpSelector
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
additional_data_response = self.requester.send_request(
stream_slice=StreamSlice(partition={"parent_id": record["id"]}, cursor_slice={})
)
additional_data = self.record_selector.select_records(response=additional_data_response, stream_state={}, records_schema={})
for data in additional_data:
record.update(data)
@dataclass
class MarketingEmailStatisticsTransformation(RecordTransformation):
"""
Custom transformation for HubSpot Marketing Emails that fetches statistics from the v3 API.
This transformation is needed because the v3 API separates email data and statistics into two endpoints:
- GET /marketing/v3/emails - for email data
- GET /marketing/v3/emails/{emailId}/statistics - for statistics
This transformation fetches statistics for each email and merges them into the email record.
"""
requester: Requester
record_selector: HttpSelector
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
try:
# Fetch statistics for this email using the v3 statistics endpoint
statistics_response = self.requester.send_request(
stream_slice=StreamSlice(partition={"email_id": record["id"]}, cursor_slice={})
)
statistics_data = self.record_selector.select_records(response=statistics_response, stream_state={}, records_schema={})
# Merge statistics into the email record
for stats in statistics_data:
record.update(stats)
except Exception as e:
# Log the error but don't fail the entire sync
# This ensures that if statistics are unavailable for some emails,
# we still get the email data
logger.warning(f"Failed to fetch statistics for email {record.get('id', 'unknown')}: {str(e)}")
pass
@dataclass
class HubspotSchemaExtractor(RecordExtractor):
"""
Transformation that encapsulates the list of properties under a single object because DynamicSchemaLoader only
accepts the set of dynamic schema fields as a single record.
This might be doable with the existing DpathExtractor configuration.
"""
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
yield {"properties": list(self.decoder.decode(response))}
@dataclass
class HubspotRenamePropertiesTransformation(RecordTransformation):
"""
Custom transformation that takes in a record that represents a map of all dynamic properties retrieved
from the Hubspot properties endpoint. This mapping nests all of these fields under a sub-object called
`properties` and updates all the property field names at the top level to be prefixed with
`properties_<property_name>`.
"""
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = {
"properties": {
"type": "object",
"properties": {},
}
}
for key, value in record.items():
transformed_record["properties"]["properties"][key] = value
updated_key = f"properties_{key}"
transformed_record[updated_key] = value
record.clear()
record.update(transformed_record)
class EngagementsHttpRequester(HttpRequester):
"""
Engagements stream uses different endpoints:
- Engagements Recent if start_date/state is less than 30 days and API is able to return all records (<10k), or
- Engagements All which extracts all records, but supports filter on connector side
Recent Engagements API:
https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
Important: This endpoint returns only last 10k most recently updated records in the last 30 days.
All Engagements API:
https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements
Important:
1. The stream is declared to use one stream slice from start date(default/config/state) to time.now(). It doesn't have step.
Based on this we can use stream_slice["start_time"] and be sure that this is equal to value in initial state.
Stream Slice [start_time] is used to define _use_recent_api, concurrent processing of date windows is incompatible and therefore does not support using a step
2.The stream is declared to use 250 as page size param in pagination.
Recent Engagements API have 100 as max param but doesn't fail is bigger value was provided and returns to 100 as default.
3. The stream has is_client_side_incremental=true to filter Engagements All response.
"""
recent_api_total_records_limit = 10000
recent_api_last_days_limit = 29
recent_api_path = "/engagements/v1/engagements/recent/modified"
all_api_path = "/engagements/v1/engagements/paged"
_use_recent_api = None
def should_use_recent_api(self, stream_slice: StreamSlice) -> bool:
if self._use_recent_api is not None:
return self._use_recent_api
# Recent engagements API returns records updated in the last 30 days only. If start time is older All engagements API should be used
if int(stream_slice["start_time"]) >= int(
DatetimeParser().format((ab_datetime_now() - timedelta(days=self.recent_api_last_days_limit)), "%ms")
):
# Recent engagements API returns only 10k most recently updated records.
# API response indicates that there are more records so All engagements API should be used
_, response = self._http_client.send_request(
http_method=self.get_method().value,
url=self._join_url(self.get_url_base(), self.recent_api_path),
headers=self._request_headers({}, stream_slice, {}, {}),
params={"count": 250, "since": stream_slice["start_time"]},
request_kwargs={"stream": self.stream_response},
)
if response.json().get("total") <= self.recent_api_total_records_limit:
self._use_recent_api = True
else:
self._use_recent_api = False
return self._use_recent_api
def get_path(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
if self.should_use_recent_api(stream_slice):
return self.recent_api_path
return self.all_api_path
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
request_params = self._request_options_provider.get_request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
if self.should_use_recent_api(stream_slice):
request_params.update({"since": stream_slice["start_time"]})
return request_params
class EntitySchemaNormalization(TypeTransformer):
"""
For CRM object and CRM Search streams, which have dynamic schemas, custom normalization should be applied.
Convert record's received value according to its declared catalog dynamic schema type and format.
Empty strings for fields that have non string type converts to None.
Numeric strings for fields that have number type converts to integer type, otherwise to number.
Strings like "true"/"false" with boolean type converts to boolean.
Date and Datime fields converts to format datetime string. Set __ab_apply_cast_datetime: false in field definition, if you don't need to format datetime strings.
"""
def __init__(self, *args, **kwargs):
config = TransformConfig.CustomSchemaNormalization
super().__init__(config)
self.registerCustomTransform(self.get_transform_function())
def get_transform_function(self):
def transform_function(original_value: str, field_schema: Dict[str, Any]) -> Any:
target_type = field_schema.get("type")
target_format = field_schema.get("format")
if "null" in target_type:
if original_value is None:
return original_value
# Sometimes hubspot output empty string on field with format set.
# Set it to null to avoid errors on destination' normalization stage.
if target_format and original_value == "":
return None
if isinstance(original_value, str):
if "string" not in target_type and original_value == "":
# do not cast empty strings, return None instead to be properly cast.
transformed_value = None
return transformed_value
if "number" in target_type:
# do not cast numeric IDs into float, use integer instead
target_type = int if original_value.isnumeric() else float
# In some cases, the returned value from Hubspot is non-numeric despite the discovered schema explicitly declaring a numeric type.
# For example, a field with a type of "number" might return a string: "3092727991;3881228353;15895321999"
# So, we attempt to cast the value to the declared type, and failing that, we log the error and return the original value.
# This matches the previous behavior in the Python implementation.
try:
transformed_value = target_type(original_value.replace(",", ""))
return transformed_value
except ValueError:
logger.exception(f"Could not cast field value {original_value} to {target_type}")
return original_value
if "boolean" in target_type and original_value.lower() in ["true", "false"]:
transformed_value = str(original_value).lower() == "true"
return transformed_value
if target_format:
if field_schema.get("__ab_apply_cast_datetime") is False:
return original_value
if "date" == target_format:
dt = EntitySchemaNormalization.convert_datetime_string_to_ab_datetime(original_value)
if dt:
transformed_value = DatetimeParser().format(dt, "%Y-%m-%d")
return transformed_value
else:
return original_value
if "date-time" == target_format:
dt = EntitySchemaNormalization.convert_datetime_string_to_ab_datetime(original_value)
if dt:
transformed_value = ab_datetime_format(dt)
return transformed_value
else:
return original_value
if "properties" in field_schema and isinstance(original_value, dict):
normalized_nested_properties = dict()
for nested_key, nested_val in original_value.items():
nested_property_schema = field_schema.get("properties").get(nested_key)
if nested_property_schema:
normalized_nested_properties[nested_key] = transform_function(nested_val, nested_property_schema)
else:
normalized_nested_properties[nested_key] = nested_val
return normalized_nested_properties
else:
return self.default_convert(original_value, field_schema)
return transform_function
@staticmethod
def convert_datetime_string_to_ab_datetime(datetime_str: str) -> Optional[AirbyteDateTime]:
"""
Implements the existing source-hubspot behavior where the API response can return either a timestamp
with seconds or milliseconds precision. We first attempt to parse in seconds, then millisecond, or
if unparsable we log a warning and emit the original value. Returns None if the string could not
be parsed into a datetime object because the existing source emits the original value and logs warning.
"""
if not datetime_str:
return None
# Hubspot sometimes returns datetime strings as a float which can cause an OverflowError. When a float
# string is detected, the string is converted into an integer string before parsing
try:
float(datetime_str)
if "." in datetime_str:
datetime_str = datetime_str.split(".")[0]
except ValueError:
pass
try:
return ab_datetime_parse(datetime_str)
except (ValueError, TypeError, OverflowError) as ex:
pass
try:
return ab_datetime_parse(int(datetime_str) // 1000)
except (ValueError, TypeError, OverflowError) as ex:
logger.warning(f"Couldn't parse date/datetime string field. Timestamp field value: {datetime_str}. Ex: {ex}")
return None
class HubspotFlattenAssociationsTransformation(RecordTransformation):
"""
A record transformation that flattens the `associations` field in HubSpot records.
This transformation takes a nested dictionary under the `associations` key and extracts the IDs
of associated objects. The extracted lists of IDs are added as new top-level fields in the record,
using the association name as the key (spaces replaced with underscores).
Example:
Input:
{
"id": 1,
"associations": {
"Contacts": {"results": [{"id": 101}, {"id": 102}]}
}
}
Output:
{
"id": 1,
"Contacts": [101, 102]
}
"""
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
if "associations" in record:
associations = record.pop("associations")
for name, association in associations.items():
record[name.replace(" ", "_")] = [row["id"] for row in association.get("results", [])]
@dataclass
class HubspotAssociationsExtractor(RecordExtractor):
"""
Custom extractor for HubSpot association-enriched records.
This extractor:
- Navigates a specified `field_path` within the JSON response to extract a list of primary entities.
- Gets records IDs to use in associations retriever body.
- Uses a secondary retriever to fetch associated objects for each entity (based on provided `associations_list`).
- Merges associated object IDs back into each entity's record under the corresponding association name.
Attributes:
field_path: Path to the list of records in the API response.
entity: The field used for associations retriever endpoint.
associations_list: List of associations to fetch (e.g., ["contacts", "companies"]).
"""
field_path: List[Union[InterpolatedString, str]]
entity: Union[InterpolatedString, str]
associations_list: Union[List[str], Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
self._entity = InterpolatedString.create(self.entity, parameters=parameters)
# The list of associations can either be provided as a static list of constants or evaluated from an interpolated string
if isinstance(self.associations_list, list):
self._associations_list = self.associations_list
else:
self._associations_list = InterpolatedString.create(self.associations_list, parameters=parameters)
self._associations_retriever = build_associations_retriever(
associations_list=self._associations_list,
parent_entity=self._entity,
config=self.config,
)
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
for body in self.decoder.decode(response):
if len(self._field_path) == 0:
extracted = body
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
records = extracted
elif extracted:
raise ValueError(f"field_path should always point towards a list field in the response body")
# If no records were extracted, no need to call the associations retriever
if not records:
continue
records_by_pk = {record["id"]: record for record in records}
record_ids = [{"id": record["id"]} for record in records]
slices = self._associations_retriever.stream_slicer.stream_slices()
for _slice in slices:
# Append the list of extracted records so they are usable during interpolation of the JSON request body
stream_slice = StreamSlice(
cursor_slice=_slice.cursor_slice, partition=_slice.partition, extra_fields={"record_ids": record_ids}
)
logger.debug(f"Reading {_slice} associations of {self._entity.eval(config=self.config)}")
associations = self._associations_retriever.read_records({}, stream_slice=stream_slice)
for group in associations:
slice_value = stream_slice["association_name"]
current_record = records_by_pk[group["from"]["id"]]
associations_list = current_record.get(slice_value, [])
associations_list.extend(association["toObjectId"] for association in group["to"])
# Associations are defined in the schema as string ids but come in the API response as integer ids
current_record[slice_value] = [str(association) for association in associations_list]
yield from records_by_pk.values()
def build_associations_retriever(
*,
associations_list: Union[List[str], InterpolatedString],
parent_entity: InterpolatedString,
config: Config,
) -> SimpleRetriever:
"""
Instantiates a SimpleRetriever that makes requests against:
POST /crm/v4/associations/{self.parent_entity}/{stream_slice.association}/batch/read
The current architecture of the low-code framework makes it difficult to instantiate components
in arbitrary locations within the manifest.yaml. For example, the only place where a SimpleRetriever
can be instantiated is as a field of DeclarativeStream because the `model_to_component_factory.py.create_simple_retriever()`
constructor takes incoming parameters from values of the DeclarativeStream.
So we are unable to build the associations_retriever, from within this custom HubspotAssociationsExtractor
because we will be missing required parameters that are not supplied by the SimpleRetrieverModel.
And we're left with the workaround of building the runtime components in this method.
"""
parameters: Mapping[str, Any] = {}
evaluated_entity = parent_entity.eval(config=config)
if isinstance(associations_list, InterpolatedString):
associations = associations_list.eval(config=config)
else:
associations = associations_list
bearer_authenticator = BearerAuthenticator(
token_provider=InterpolatedStringTokenProvider(
api_token=config.get("credentials", {}).get("access_token", ""),
config=config,
parameters=parameters,
),
config=config,
parameters=parameters,
)
# Use default values to create a component if another authentication method is used.
# If values are missing it will fail in the parent stream
oauth_authenticator = DeclarativeOauth2Authenticator(
config=config,
parameters=parameters,
client_id=config.get("credentials", {}).get("client_id", "client_id"),
client_secret=config.get("credentials", {}).get("client_secret", "client_secret"),
refresh_token=config.get("credentials", {}).get("refresh_token", "refresh_token"),
token_refresh_endpoint="https://api.hubapi.com/oauth/v1/token",
)
authenticator = SelectiveAuthenticator(
config,
authenticators={"Private App Credentials": bearer_authenticator, "OAuth Credentials": oauth_authenticator},
authenticator_selection_path=["credentials", "credentials_title"],
)
requester = HttpRequester(
name="associations",
url_base="https://api.hubapi.com",
path=f"/crm/v4/associations/{evaluated_entity}/" + "{{ stream_partition['association_name'] }}/batch/read",
http_method="POST",
authenticator=authenticator,
request_options_provider=InterpolatedRequestOptionsProvider(
request_body_json={"inputs": "{{ stream_slice.extra_fields['record_ids'] }}"},
config=config,
parameters=parameters,
),
error_handler=DefaultErrorHandler(
backoff_strategies=[
WaitTimeFromHeaderBackoffStrategy(header="Retry-After", config=config, parameters=parameters),
ExponentialBackoffStrategy(config=config, parameters=parameters),
],
response_filters=[
HttpResponseFilter(
action="RETRY",
http_codes={429},
error_message="HubSpot rate limit reached (429). Backoff based on 'Retry-After' header, then exponential backoff fallback.",
config=config,
parameters=parameters,
),
HttpResponseFilter(
action="RETRY",
http_codes={502, 503},
error_message="HubSpot server error (5xx). Retrying with exponential backoff...",
config=config,
parameters=parameters,
),
HttpResponseFilter(
action="RETRY",
http_codes={401},
error_message="Authentication to HubSpot has expired. Authentication will be retried, but if this issue persists, re-authenticate to restore access to HubSpot.",
config=config,
parameters=parameters,
),
HttpResponseFilter(
action="FAIL",
http_codes={530},
error_message="The user cannot be authorized with provided credentials. Please verify that your credentials are valid and try again.",
config=config,
parameters=parameters,
),
HttpResponseFilter(
action="FAIL",
http_codes={403},
error_message="Access denied (403). The authenticated user does not have permissions to access the resource.",
config=config,
parameters=parameters,
),
HttpResponseFilter(
action="FAIL",
http_codes={400},
error_message="Bad request (400). Please verify your credentials and try again.",
config=config,
parameters=parameters,
),
],
config=config,
parameters=parameters,
),
config=config,
parameters=parameters,
)
# Slice over IDs emitted by the parent stream
slicer = ListPartitionRouter(values=associations, cursor_field="association_name", config=config, parameters=parameters)
selector = RecordSelector(
extractor=DpathExtractor(field_path=["results"], config=config, parameters=parameters),
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
record_filter=None,
transformations=[],
config=config,
parameters=parameters,
)
return SimpleRetriever(
name="associations",
primary_key=None,
requester=requester,
record_selector=selector,
paginator=None, # batch/read never paginates
stream_slicer=slicer,
config=config,
parameters=parameters,
)
@dataclass
class HubspotCRMSearchPaginationStrategy(PaginationStrategy):
"""
This pagination strategy functioning similarly to the default cursor pagination strategy. The custom
behavior accounts for Hubspot's /search API limitation that only allows for a max of 10,000 total results
for a query. Once we reach 10,000 records, we start a new query using the latest id collected.
"""
page_size: int
primary_key: str = "id"
RECORDS_LIMIT = 10000
@property
def initial_token(self) -> Optional[Any]:
return {"after": 0}
def next_page_token(
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any] = None,
) -> Optional[Any]:
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest id that has been collected.
if last_page_token_value and last_page_token_value.get("after", 0) + last_page_size >= self.RECORDS_LIMIT:
return {"after": 0, "id": int(last_record[self.primary_key]) + 1}
# Stop paginating when there are fewer records than the page size or the current page has no records
if (last_page_size < self.page_size) or last_page_size == 0 or not response.json().get("paging"):
return None
last_id_of_previous_chunk = last_page_token_value.get("id")
if last_id_of_previous_chunk:
return {"after": last_page_token_value["after"] + last_page_size, self.primary_key: last_id_of_previous_chunk}
else:
return {"after": last_page_token_value["after"] + last_page_size}
def get_page_size(self) -> Optional[int]:
return self.page_size
@dataclass
class HubspotCustomObjectsSchemaLoader(SchemaLoader):
"""
Custom schema loader for HubSpot custom object streams.
This class generates a JSON schema based on the properties defined in the manifest.
These properties are injected into the parameters by the HttpComponentsResolver used within the DynamicDeclarativeStream.
"""
config: Mapping[str, Any]
parameters: InitVar[Mapping[str, Any]]
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
raw_schema_properties: List[Mapping[str, Any]] = parameters.get("schema_properties", {})
properties = self._get_properties(raw_schema=raw_schema_properties)
self._schema = self._generate_schema(properties)
def _get_properties(self, raw_schema: List[Mapping[str, Any]]) -> Mapping[str, Any]:
return {field["name"]: self._field_to_property_schema(field) for field in raw_schema}
def _field_to_property_schema(self, field: Mapping[str, Any]) -> Mapping[str, Any]:
field_type = field["type"]
if field_type in ["string", "enumeration", "phone_number", "object_coordinates", "json"]:
return {"type": ["null", "string"]}
elif field_type == "datetime" or field_type == "date-time":
return {"type": ["null", "string"], "format": "date-time"}
elif field_type == "date":
return {"type": ["null", "string"], "format": "date"}
elif field_type == "number":
return {"type": ["null", "number"]}
elif field_type == "boolean" or field_type == "bool":
return {"type": ["null", "boolean"]}
else:
logger.warn(f"Field {field['name']} has unrecognized type: {field['type']} casting to string.")
return {"type": ["null", "string"]}
def _generate_schema(self, properties: Mapping[str, Any]) -> Mapping[str, Any]:
unnested_properties = {f"properties_{property_name}": property_value for (property_name, property_value) in properties.items()}
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": True,
"properties": {
"id": {"type": ["null", "string"]},
"createdAt": {"type": ["null", "string"], "format": "date-time"},
"updatedAt": {"type": ["null", "string"], "format": "date-time"},
"archived": {"type": ["null", "boolean"]},
"properties": {"type": ["null", "object"], "properties": properties},
**unnested_properties,
},
}
return schema
def get_json_schema(self) -> Mapping[str, Any]:
return self._schema
_TRUTHY_STRINGS = ("y", "yes", "t", "true", "on", "1")
_FALSEY_STRINGS = ("n", "no", "f", "false", "off", "0")
def _strtobool(value: str, /) -> int:
"""Mimic the behavior of distutils.util.strtobool.
From: https://docs.python.org/2/distutils/apiref.html#distutils.util.strtobool
> Convert a string representation of truth to true (1) or false (0).
> True values are y, yes, t, true, on and 1; false values are n, no, f, false, off and 0. Raises
> `ValueError` if val is anything else.
"""
normalized_str = value.lower().strip()
if normalized_str in _TRUTHY_STRINGS:
return 1
if normalized_str in _FALSEY_STRINGS:
return 0
raise ValueError(f"Invalid boolean value: {normalized_str}")