# # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # import logging from dataclasses import InitVar, dataclass, field from datetime import timedelta from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath import requests from airbyte_cdk import ( BearerAuthenticator, DpathExtractor, RecordSelector, SimpleRetriever, ) from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator from airbyte_cdk.sources.declarative.auth.selective_authenticator import SelectiveAuthenticator from airbyte_cdk.sources.declarative.auth.token_provider import InterpolatedStringTokenProvider from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter from airbyte_cdk.sources.declarative.requesters import HttpRequester from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( ExponentialBackoffStrategy, WaitTimeFromHeaderBackoffStrategy, ) from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_format, ab_datetime_now, ab_datetime_parse logger = logging.getLogger("airbyte") @dataclass class NewtoLegacyFieldTransformation(RecordTransformation): """ Implements a custom transformation which adds the legacy field equivalent of v2 fields for streams which contain Deals and Contacts entities. This custom implmentation was developed in lieu of the AddFields component due to the dynamic-nature of the record properties for the HubSpot source. Each For example: hs_v2_date_exited_{stage_id} -> hs_date_exited_{stage_id} where {stage_id} is a user-generated value """ field_mapping: Mapping[str, str] def transform( self, record_or_schema: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: """ Transform a record in place by adding fields directly to the record by manipulating the injected fields into a legacy field to avoid breaking syncs. :param record_or_schema: The input record or schema to be transformed. """ is_record = record_or_schema.get("properties") is not None for field, value in list(record_or_schema.get("properties", record_or_schema).items()): for legacy_field, new_field in self.field_mapping.items(): if new_field in field: transformed_field = field.replace(new_field, legacy_field) if legacy_field == "hs_lifecyclestage_" and not transformed_field.endswith("_date"): transformed_field += "_date" if is_record: if record_or_schema["properties"].get(transformed_field) is None: record_or_schema["properties"][transformed_field] = value else: if record_or_schema.get(transformed_field) is None: record_or_schema[transformed_field] = value class MigrateEmptyStringState(StateMigration): cursor_field: str config: Config cursor_format: Optional[str] = None def __init__(self, cursor_field, config: Config, cursor_format: Optional[str] = None): self.cursor_field = cursor_field self.cursor_format = cursor_format self.config = config def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: # if start date wasn't provided in the config default date will be used start_date = self.config.get("start_date", "2006-06-01T00:00:00.000Z") if self.cursor_format: dt = ab_datetime_parse(start_date) formatted_start_date = DatetimeParser().format(dt, self.cursor_format) return {self.cursor_field: formatted_start_date} return {self.cursor_field: start_date} def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: return stream_state.get(self.cursor_field) == "" @dataclass class HubspotPropertyHistoryExtractor(RecordExtractor): """ Custom record extractor which parses the JSON response from Hubspot and for each instance returned for the specified object type (ex. Contacts, Deals, etc.), yields records for every requested property. Because this is a property history stream, an individual property can yield multiple records representing the previous version of that property. The custom behavior of this component is: - Iterating over and extracting property history instances as individual records - Injecting fields from out levels of the response into yielded records to be used as primary keys """ field_path: List[Union[InterpolatedString, str]] entity_primary_key: str additional_keys: Optional[List[str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path] for path_index in range(len(self.field_path)): if isinstance(self.field_path[path_index], str): self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters) def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]: for body in self.decoder.decode(response): results = [] if len(self._field_path) == 0: extracted = body else: path = [path.eval(self.config) for path in self._field_path] if "*" in path: extracted = dpath.values(body, path) else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): results = extracted elif extracted: raise ValueError(f"field_path should always point towards a list field in the response body for property_history streams") for result in results: properties_with_history = result.get("propertiesWithHistory") primary_key = result.get("id") additional_keys = ( {additional_key: result.get(additional_key) for additional_key in self.additional_keys} if self.additional_keys else {} ) if properties_with_history: for property_name, value_dict in properties_with_history.items(): if property_name == "hs_lastmodifieddate": # Skipping the lastmodifieddate since it only returns the value # when one field of a record was changed no matter which # field was changed. It therefore creates overhead, since for # every changed property there will be the date it was changed in itself # and a change in the lastmodifieddate field. continue for version in value_dict: version["property"] = property_name version[self.entity_primary_key] = primary_key yield version | additional_keys @dataclass class AddFieldsFromEndpointTransformation(RecordTransformation): """ Makes request to provided endpoint and updates record with retrieved data. requester: Requester record_selector: HttpSelector """ requester: Requester record_selector: HttpSelector def transform( self, record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: additional_data_response = self.requester.send_request( stream_slice=StreamSlice(partition={"parent_id": record["id"]}, cursor_slice={}) ) additional_data = self.record_selector.select_records(response=additional_data_response, stream_state={}, records_schema={}) for data in additional_data: record.update(data) @dataclass class MarketingEmailStatisticsTransformation(RecordTransformation): """ Custom transformation for HubSpot Marketing Emails that fetches statistics from the v3 API. This transformation is needed because the v3 API separates email data and statistics into two endpoints: - GET /marketing/v3/emails - for email data - GET /marketing/v3/emails/{emailId}/statistics - for statistics This transformation fetches statistics for each email and merges them into the email record. """ requester: Requester record_selector: HttpSelector def transform( self, record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: try: # Fetch statistics for this email using the v3 statistics endpoint statistics_response = self.requester.send_request( stream_slice=StreamSlice(partition={"email_id": record["id"]}, cursor_slice={}) ) statistics_data = self.record_selector.select_records(response=statistics_response, stream_state={}, records_schema={}) # Merge statistics into the email record for stats in statistics_data: record.update(stats) except Exception as e: # Log the error but don't fail the entire sync # This ensures that if statistics are unavailable for some emails, # we still get the email data logger.warning(f"Failed to fetch statistics for email {record.get('id', 'unknown')}: {str(e)}") pass @dataclass class HubspotSchemaExtractor(RecordExtractor): """ Transformation that encapsulates the list of properties under a single object because DynamicSchemaLoader only accepts the set of dynamic schema fields as a single record. This might be doable with the existing DpathExtractor configuration. """ config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]: yield {"properties": list(self.decoder.decode(response))} @dataclass class HubspotRenamePropertiesTransformation(RecordTransformation): """ Custom transformation that takes in a record that represents a map of all dynamic properties retrieved from the Hubspot properties endpoint. This mapping nests all of these fields under a sub-object called `properties` and updates all the property field names at the top level to be prefixed with `properties_`. """ def transform( self, record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: transformed_record = { "properties": { "type": "object", "properties": {}, } } for key, value in record.items(): transformed_record["properties"]["properties"][key] = value updated_key = f"properties_{key}" transformed_record[updated_key] = value record.clear() record.update(transformed_record) class EngagementsHttpRequester(HttpRequester): """ Engagements stream uses different endpoints: - Engagements Recent if start_date/state is less than 30 days and API is able to return all records (<10k), or - Engagements All which extracts all records, but supports filter on connector side Recent Engagements API: https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements Important: This endpoint returns only last 10k most recently updated records in the last 30 days. All Engagements API: https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements Important: 1. The stream is declared to use one stream slice from start date(default/config/state) to time.now(). It doesn't have step. Based on this we can use stream_slice["start_time"] and be sure that this is equal to value in initial state. Stream Slice [start_time] is used to define _use_recent_api, concurrent processing of date windows is incompatible and therefore does not support using a step 2.The stream is declared to use 250 as page size param in pagination. Recent Engagements API have 100 as max param but doesn't fail is bigger value was provided and returns to 100 as default. 3. The stream has is_client_side_incremental=true to filter Engagements All response. """ recent_api_total_records_limit = 10000 recent_api_last_days_limit = 29 recent_api_path = "/engagements/v1/engagements/recent/modified" all_api_path = "/engagements/v1/engagements/paged" _use_recent_api = None def should_use_recent_api(self, stream_slice: StreamSlice) -> bool: if self._use_recent_api is not None: return self._use_recent_api # Recent engagements API returns records updated in the last 30 days only. If start time is older All engagements API should be used if int(stream_slice["start_time"]) >= int( DatetimeParser().format((ab_datetime_now() - timedelta(days=self.recent_api_last_days_limit)), "%ms") ): # Recent engagements API returns only 10k most recently updated records. # API response indicates that there are more records so All engagements API should be used _, response = self._http_client.send_request( http_method=self.get_method().value, url=self._join_url(self.get_url_base(), self.recent_api_path), headers=self._request_headers({}, stream_slice, {}, {}), params={"count": 250, "since": stream_slice["start_time"]}, request_kwargs={"stream": self.stream_response}, ) if response.json().get("total") <= self.recent_api_total_records_limit: self._use_recent_api = True else: self._use_recent_api = False return self._use_recent_api def get_path( self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> str: if self.should_use_recent_api(stream_slice): return self.recent_api_path return self.all_api_path def get_request_params( self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: request_params = self._request_options_provider.get_request_params( stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ) if self.should_use_recent_api(stream_slice): request_params.update({"since": stream_slice["start_time"]}) return request_params class EntitySchemaNormalization(TypeTransformer): """ For CRM object and CRM Search streams, which have dynamic schemas, custom normalization should be applied. Convert record's received value according to its declared catalog dynamic schema type and format. Empty strings for fields that have non string type converts to None. Numeric strings for fields that have number type converts to integer type, otherwise to number. Strings like "true"/"false" with boolean type converts to boolean. Date and Datime fields converts to format datetime string. Set __ab_apply_cast_datetime: false in field definition, if you don't need to format datetime strings. """ def __init__(self, *args, **kwargs): config = TransformConfig.CustomSchemaNormalization super().__init__(config) self.registerCustomTransform(self.get_transform_function()) def get_transform_function(self): def transform_function(original_value: str, field_schema: Dict[str, Any]) -> Any: target_type = field_schema.get("type") target_format = field_schema.get("format") if "null" in target_type: if original_value is None: return original_value # Sometimes hubspot output empty string on field with format set. # Set it to null to avoid errors on destination' normalization stage. if target_format and original_value == "": return None if isinstance(original_value, str): if "string" not in target_type and original_value == "": # do not cast empty strings, return None instead to be properly cast. transformed_value = None return transformed_value if "number" in target_type: # do not cast numeric IDs into float, use integer instead target_type = int if original_value.isnumeric() else float # In some cases, the returned value from Hubspot is non-numeric despite the discovered schema explicitly declaring a numeric type. # For example, a field with a type of "number" might return a string: "3092727991;3881228353;15895321999" # So, we attempt to cast the value to the declared type, and failing that, we log the error and return the original value. # This matches the previous behavior in the Python implementation. try: transformed_value = target_type(original_value.replace(",", "")) return transformed_value except ValueError: logger.exception(f"Could not cast field value {original_value} to {target_type}") return original_value if "boolean" in target_type and original_value.lower() in ["true", "false"]: transformed_value = str(original_value).lower() == "true" return transformed_value if target_format: if field_schema.get("__ab_apply_cast_datetime") is False: return original_value if "date" == target_format: dt = EntitySchemaNormalization.convert_datetime_string_to_ab_datetime(original_value) if dt: transformed_value = DatetimeParser().format(dt, "%Y-%m-%d") return transformed_value else: return original_value if "date-time" == target_format: dt = EntitySchemaNormalization.convert_datetime_string_to_ab_datetime(original_value) if dt: transformed_value = ab_datetime_format(dt) return transformed_value else: return original_value if "properties" in field_schema and isinstance(original_value, dict): normalized_nested_properties = dict() for nested_key, nested_val in original_value.items(): nested_property_schema = field_schema.get("properties").get(nested_key) if nested_property_schema: normalized_nested_properties[nested_key] = transform_function(nested_val, nested_property_schema) else: normalized_nested_properties[nested_key] = nested_val return normalized_nested_properties else: return self.default_convert(original_value, field_schema) return transform_function @staticmethod def convert_datetime_string_to_ab_datetime(datetime_str: str) -> Optional[AirbyteDateTime]: """ Implements the existing source-hubspot behavior where the API response can return either a timestamp with seconds or milliseconds precision. We first attempt to parse in seconds, then millisecond, or if unparsable we log a warning and emit the original value. Returns None if the string could not be parsed into a datetime object because the existing source emits the original value and logs warning. """ if not datetime_str: return None # Hubspot sometimes returns datetime strings as a float which can cause an OverflowError. When a float # string is detected, the string is converted into an integer string before parsing try: float(datetime_str) if "." in datetime_str: datetime_str = datetime_str.split(".")[0] except ValueError: pass try: return ab_datetime_parse(datetime_str) except (ValueError, TypeError, OverflowError) as ex: pass try: return ab_datetime_parse(int(datetime_str) // 1000) except (ValueError, TypeError, OverflowError) as ex: logger.warning(f"Couldn't parse date/datetime string field. Timestamp field value: {datetime_str}. Ex: {ex}") return None class HubspotFlattenAssociationsTransformation(RecordTransformation): """ A record transformation that flattens the `associations` field in HubSpot records. This transformation takes a nested dictionary under the `associations` key and extracts the IDs of associated objects. The extracted lists of IDs are added as new top-level fields in the record, using the association name as the key (spaces replaced with underscores). Example: Input: { "id": 1, "associations": { "Contacts": {"results": [{"id": 101}, {"id": 102}]} } } Output: { "id": 1, "Contacts": [101, 102] } """ def transform( self, record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> None: if "associations" in record: associations = record.pop("associations") for name, association in associations.items(): record[name.replace(" ", "_")] = [row["id"] for row in association.get("results", [])] @dataclass class HubspotAssociationsExtractor(RecordExtractor): """ Custom extractor for HubSpot association-enriched records. This extractor: - Navigates a specified `field_path` within the JSON response to extract a list of primary entities. - Gets records IDs to use in associations retriever body. - Uses a secondary retriever to fetch associated objects for each entity (based on provided `associations_list`). - Merges associated object IDs back into each entity's record under the corresponding association name. Attributes: field_path: Path to the list of records in the API response. entity: The field used for associations retriever endpoint. associations_list: List of associations to fetch (e.g., ["contacts", "companies"]). """ field_path: List[Union[InterpolatedString, str]] entity: Union[InterpolatedString, str] associations_list: Union[List[str], Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path] for path_index in range(len(self.field_path)): if isinstance(self.field_path[path_index], str): self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters) self._entity = InterpolatedString.create(self.entity, parameters=parameters) # The list of associations can either be provided as a static list of constants or evaluated from an interpolated string if isinstance(self.associations_list, list): self._associations_list = self.associations_list else: self._associations_list = InterpolatedString.create(self.associations_list, parameters=parameters) self._associations_retriever = build_associations_retriever( associations_list=self._associations_list, parent_entity=self._entity, config=self.config, ) def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]: for body in self.decoder.decode(response): if len(self._field_path) == 0: extracted = body else: path = [path.eval(self.config) for path in self._field_path] if "*" in path: extracted = dpath.values(body, path) else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): records = extracted elif extracted: raise ValueError(f"field_path should always point towards a list field in the response body") # If no records were extracted, no need to call the associations retriever if not records: continue records_by_pk = {record["id"]: record for record in records} record_ids = [{"id": record["id"]} for record in records] slices = self._associations_retriever.stream_slicer.stream_slices() for _slice in slices: # Append the list of extracted records so they are usable during interpolation of the JSON request body stream_slice = StreamSlice( cursor_slice=_slice.cursor_slice, partition=_slice.partition, extra_fields={"record_ids": record_ids} ) logger.debug(f"Reading {_slice} associations of {self._entity.eval(config=self.config)}") associations = self._associations_retriever.read_records({}, stream_slice=stream_slice) for group in associations: slice_value = stream_slice["association_name"] current_record = records_by_pk[group["from"]["id"]] associations_list = current_record.get(slice_value, []) associations_list.extend(association["toObjectId"] for association in group["to"]) # Associations are defined in the schema as string ids but come in the API response as integer ids current_record[slice_value] = [str(association) for association in associations_list] yield from records_by_pk.values() def build_associations_retriever( *, associations_list: Union[List[str], InterpolatedString], parent_entity: InterpolatedString, config: Config, ) -> SimpleRetriever: """ Instantiates a SimpleRetriever that makes requests against: POST /crm/v4/associations/{self.parent_entity}/{stream_slice.association}/batch/read The current architecture of the low-code framework makes it difficult to instantiate components in arbitrary locations within the manifest.yaml. For example, the only place where a SimpleRetriever can be instantiated is as a field of DeclarativeStream because the `model_to_component_factory.py.create_simple_retriever()` constructor takes incoming parameters from values of the DeclarativeStream. So we are unable to build the associations_retriever, from within this custom HubspotAssociationsExtractor because we will be missing required parameters that are not supplied by the SimpleRetrieverModel. And we're left with the workaround of building the runtime components in this method. """ parameters: Mapping[str, Any] = {} evaluated_entity = parent_entity.eval(config=config) if isinstance(associations_list, InterpolatedString): associations = associations_list.eval(config=config) else: associations = associations_list bearer_authenticator = BearerAuthenticator( token_provider=InterpolatedStringTokenProvider( api_token=config.get("credentials", {}).get("access_token", ""), config=config, parameters=parameters, ), config=config, parameters=parameters, ) # Use default values to create a component if another authentication method is used. # If values are missing it will fail in the parent stream oauth_authenticator = DeclarativeOauth2Authenticator( config=config, parameters=parameters, client_id=config.get("credentials", {}).get("client_id", "client_id"), client_secret=config.get("credentials", {}).get("client_secret", "client_secret"), refresh_token=config.get("credentials", {}).get("refresh_token", "refresh_token"), token_refresh_endpoint="https://api.hubapi.com/oauth/v1/token", ) authenticator = SelectiveAuthenticator( config, authenticators={"Private App Credentials": bearer_authenticator, "OAuth Credentials": oauth_authenticator}, authenticator_selection_path=["credentials", "credentials_title"], ) requester = HttpRequester( name="associations", url_base="https://api.hubapi.com", path=f"/crm/v4/associations/{evaluated_entity}/" + "{{ stream_partition['association_name'] }}/batch/read", http_method="POST", authenticator=authenticator, request_options_provider=InterpolatedRequestOptionsProvider( request_body_json={"inputs": "{{ stream_slice.extra_fields['record_ids'] }}"}, config=config, parameters=parameters, ), error_handler=DefaultErrorHandler( backoff_strategies=[ WaitTimeFromHeaderBackoffStrategy(header="Retry-After", config=config, parameters=parameters), ExponentialBackoffStrategy(config=config, parameters=parameters), ], response_filters=[ HttpResponseFilter( action="RETRY", http_codes={429}, error_message="HubSpot rate limit reached (429). Backoff based on 'Retry-After' header, then exponential backoff fallback.", config=config, parameters=parameters, ), HttpResponseFilter( action="RETRY", http_codes={502, 503}, error_message="HubSpot server error (5xx). Retrying with exponential backoff...", config=config, parameters=parameters, ), HttpResponseFilter( action="RETRY", http_codes={401}, error_message="Authentication to HubSpot has expired. Authentication will be retried, but if this issue persists, re-authenticate to restore access to HubSpot.", config=config, parameters=parameters, ), HttpResponseFilter( action="FAIL", http_codes={530}, error_message="The user cannot be authorized with provided credentials. Please verify that your credentials are valid and try again.", config=config, parameters=parameters, ), HttpResponseFilter( action="FAIL", http_codes={403}, error_message="Access denied (403). The authenticated user does not have permissions to access the resource.", config=config, parameters=parameters, ), HttpResponseFilter( action="FAIL", http_codes={400}, error_message="Bad request (400). Please verify your credentials and try again.", config=config, parameters=parameters, ), ], config=config, parameters=parameters, ), config=config, parameters=parameters, ) # Slice over IDs emitted by the parent stream slicer = ListPartitionRouter(values=associations, cursor_field="association_name", config=config, parameters=parameters) selector = RecordSelector( extractor=DpathExtractor(field_path=["results"], config=config, parameters=parameters), schema_normalization=TypeTransformer(TransformConfig.NoTransform), record_filter=None, transformations=[], config=config, parameters=parameters, ) return SimpleRetriever( name="associations", primary_key=None, requester=requester, record_selector=selector, paginator=None, # batch/read never paginates stream_slicer=slicer, config=config, parameters=parameters, ) @dataclass class HubspotCRMSearchPaginationStrategy(PaginationStrategy): """ This pagination strategy functioning similarly to the default cursor pagination strategy. The custom behavior accounts for Hubspot's /search API limitation that only allows for a max of 10,000 total results for a query. Once we reach 10,000 records, we start a new query using the latest id collected. """ page_size: int primary_key: str = "id" RECORDS_LIMIT = 10000 @property def initial_token(self) -> Optional[Any]: return {"after": 0} def next_page_token( self, response: requests.Response, last_page_size: int, last_record: Optional[Record], last_page_token_value: Optional[Any] = None, ) -> Optional[Any]: # Hubspot documentation states that the search endpoints are limited to 10,000 total results # for any given query. Attempting to page beyond 10,000 will result in a 400 error. # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and # start a new search query with the latest id that has been collected. if last_page_token_value and last_page_token_value.get("after", 0) + last_page_size >= self.RECORDS_LIMIT: return {"after": 0, "id": int(last_record[self.primary_key]) + 1} # Stop paginating when there are fewer records than the page size or the current page has no records if (last_page_size < self.page_size) or last_page_size == 0 or not response.json().get("paging"): return None last_id_of_previous_chunk = last_page_token_value.get("id") if last_id_of_previous_chunk: return {"after": last_page_token_value["after"] + last_page_size, self.primary_key: last_id_of_previous_chunk} else: return {"after": last_page_token_value["after"] + last_page_size} def get_page_size(self) -> Optional[int]: return self.page_size @dataclass class HubspotCustomObjectsSchemaLoader(SchemaLoader): """ Custom schema loader for HubSpot custom object streams. This class generates a JSON schema based on the properties defined in the manifest. These properties are injected into the parameters by the HttpComponentsResolver used within the DynamicDeclarativeStream. """ config: Mapping[str, Any] parameters: InitVar[Mapping[str, Any]] def __post_init__(self, parameters: Mapping[str, Any]) -> None: raw_schema_properties: List[Mapping[str, Any]] = parameters.get("schema_properties", {}) properties = self._get_properties(raw_schema=raw_schema_properties) self._schema = self._generate_schema(properties) def _get_properties(self, raw_schema: List[Mapping[str, Any]]) -> Mapping[str, Any]: return {field["name"]: self._field_to_property_schema(field) for field in raw_schema} def _field_to_property_schema(self, field: Mapping[str, Any]) -> Mapping[str, Any]: field_type = field["type"] if field_type in ["string", "enumeration", "phone_number", "object_coordinates", "json"]: return {"type": ["null", "string"]} elif field_type == "datetime" or field_type == "date-time": return {"type": ["null", "string"], "format": "date-time"} elif field_type == "date": return {"type": ["null", "string"], "format": "date"} elif field_type == "number": return {"type": ["null", "number"]} elif field_type == "boolean" or field_type == "bool": return {"type": ["null", "boolean"]} else: logger.warn(f"Field {field['name']} has unrecognized type: {field['type']} casting to string.") return {"type": ["null", "string"]} def _generate_schema(self, properties: Mapping[str, Any]) -> Mapping[str, Any]: unnested_properties = {f"properties_{property_name}": property_value for (property_name, property_value) in properties.items()} schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": ["null", "object"], "additionalProperties": True, "properties": { "id": {"type": ["null", "string"]}, "createdAt": {"type": ["null", "string"], "format": "date-time"}, "updatedAt": {"type": ["null", "string"], "format": "date-time"}, "archived": {"type": ["null", "boolean"]}, "properties": {"type": ["null", "object"], "properties": properties}, **unnested_properties, }, } return schema def get_json_schema(self) -> Mapping[str, Any]: return self._schema _TRUTHY_STRINGS = ("y", "yes", "t", "true", "on", "1") _FALSEY_STRINGS = ("n", "no", "f", "false", "off", "0") def _strtobool(value: str, /) -> int: """Mimic the behavior of distutils.util.strtobool. From: https://docs.python.org/2/distutils/apiref.html#distutils.util.strtobool > Convert a string representation of truth to true (1) or false (0). > True values are y, yes, t, true, on and 1; false values are n, no, f, false, off and 0. Raises > `ValueError` if val is anything else. """ normalized_str = value.lower().strip() if normalized_str in _TRUTHY_STRINGS: return 1 if normalized_str in _FALSEY_STRINGS: return 0 raise ValueError(f"Invalid boolean value: {normalized_str}")