1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[source-hubspot] - migrate deals_pipelinesto low code (#59195)

This commit is contained in:
Patrick Nilan
2025-05-07 15:37:00 -07:00
committed by GitHub
parent 1542c0f116
commit c7fe31a600
9 changed files with 189 additions and 139 deletions

View File

@@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 4.9.0
dockerImageTag: 4.10.0
dockerRepository: airbyte/source-hubspot
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
erdUrl: https://dbdocs.io/airbyteio/source-hubspot?view=relationships

View File

@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "4.9.0"
version = "4.10.0"
name = "source-hubspot"
description = "Source implementation for HubSpot."
authors = [ "Airbyte <contact@airbyte.io>",]

View File

@@ -579,14 +579,56 @@ definitions:
type: KeyTransformation
prefix: counters_
deal_pipelines_stream:
$ref: "#/definitions/stream_base"
name: deal_pipelines
primary_key:
- pipelineId
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: /crm-pipelines/v1/pipelines/deals
record_selector:
type: RecordSelector
transform_before_filtering: true
extractor:
type: DpathExtractor
field_path:
- results
decoder:
type: JsonDecoder
incremental_sync:
type: DatetimeBasedCursor
cursor_field: updatedAt
start_datetime:
type: MinMaxDatetime
datetime: "{{ format_datetime(config.get('start_date', '2006-06-01T00:00:00Z'), '%ms', '%Y-%m-%dT%H:%M:%SZ') }}"
datetime_format: "%ms"
datetime_format: "%ms"
lookback_window: P{{ config.get('lookback_window', 0) }}D
cursor_datetime_formats:
- "%ms"
is_client_side_incremental: true
schema_loader:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/deal_pipelines"
transformations:
- type: AddFields
fields:
- path: ["updatedAt"]
value: "{{ record.get('updatedAt') or record['createdAt'] }}"
streams:
- "#/definitions/campaigns_stream"
- "#/definitions/companies_property_history_stream"
- "#/definitions/contacts_property_history_stream"
- "#/definitions/deal_pipelines_stream"
- "#/definitions/deals_property_history_stream"
- "#/definitions/email_subscriptions_stream"
- "#/definitions/marketing_emails_stream"
- "#/definitions/ticket_pipelines_stream"
- "#/definitions/campaigns_stream"
# HubSpot account is limited to 110 requests every 10 seconds https://developers.hubspot.com/docs/guides/apps/api-usage/usage-details#rate-limits
concurrency_level:
@@ -595,6 +637,111 @@ concurrency_level:
max_concurrency: 40
schemas:
deal_pipelines:
$schema: http://json-schema.org/draft-07/schema#
type:
- "null"
- object
additionalProperties: true
properties:
label:
description: The label or name of the deal pipeline.
type:
- "null"
- string
displayOrder:
description: The ordering of the deal pipeline for display.
type:
- "null"
- integer
active:
description: Indicates if the deal pipeline is currently active or not.
type:
- "null"
- boolean
stages:
description: List of deal stages within the pipeline.
type:
- "null"
- array
items:
type:
- "null"
- object
properties:
label:
description: The label or name of the deal stage.
type:
- "null"
- string
displayOrder:
description: The ordering of the deal stage for display within the pipeline.
type:
- "null"
- integer
metadata:
description: Additional information related to the deal stage.
type:
- "null"
- object
properties:
isClosed:
description: Indicates if the deal stage is considered closed or not.
type:
- "null"
- string
probability:
description: The probability of closing a deal at this stage.
type:
- "null"
- string
stageId:
description: The unique identifier of the deal stage.
type:
- "null"
- string
createdAt:
description: Timestamp for the creation date of the deal stage.
type:
- "null"
- integer
updatedAt:
description: Timestamp for the last update to the deal stage.
type:
- "null"
- integer
active:
description: Indicates if the deal stage is currently active or not.
type:
- "null"
- boolean
objectType:
description: The type of object this deal pipeline is associated with.
type:
- "null"
- string
objectTypeId:
description: The ID of the object type this deal pipeline is associated with.
type:
- "null"
- string
pipelineId:
description: The unique identifier of the deal pipeline.
type:
- "null"
- string
createdAt:
description: Timestamp for the creation date of the deal pipeline.
type:
- "null"
- integer
updatedAt:
description: Timestamp for the last update to the deal pipeline.
type:
- "null"
- integer
default:
description: Indicates if this pipeline is the default one in the system.
companies_property_history:
$schema: "http://json-schema.org/draft-07/schema"
type:

View File

@@ -1,89 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"properties": {
"label": {
"description": "The label or name of the deal pipeline.",
"type": ["null", "string"]
},
"displayOrder": {
"description": "The ordering of the deal pipeline for display.",
"type": ["null", "integer"]
},
"active": {
"description": "Indicates if the deal pipeline is currently active or not.",
"type": ["null", "boolean"]
},
"stages": {
"description": "List of deal stages within the pipeline.",
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"label": {
"description": "The label or name of the deal stage.",
"type": ["null", "string"]
},
"displayOrder": {
"description": "The ordering of the deal stage for display within the pipeline.",
"type": ["null", "integer"]
},
"metadata": {
"description": "Additional information related to the deal stage.",
"type": ["null", "object"],
"properties": {
"isClosed": {
"description": "Indicates if the deal stage is considered closed or not.",
"type": ["null", "string"]
},
"probability": {
"description": "The probability of closing a deal at this stage.",
"type": ["null", "string"]
}
}
},
"stageId": {
"description": "The unique identifier of the deal stage.",
"type": ["null", "string"]
},
"createdAt": {
"description": "Timestamp for the creation date of the deal stage.",
"type": ["null", "integer"]
},
"updatedAt": {
"description": "Timestamp for the last update to the deal stage.",
"type": ["null", "integer"]
},
"active": {
"description": "Indicates if the deal stage is currently active or not.",
"type": ["null", "boolean"]
}
}
}
},
"objectType": {
"description": "The type of object this deal pipeline is associated with.",
"type": ["null", "string"]
},
"objectTypeId": {
"description": "The ID of the object type this deal pipeline is associated with.",
"type": ["null", "string"]
},
"pipelineId": {
"description": "The unique identifier of the deal pipeline.",
"type": ["null", "string"]
},
"createdAt": {
"description": "Timestamp for the creation date of the deal pipeline.",
"type": ["null", "integer"]
},
"updatedAt": {
"description": "Timestamp for the last update to the deal pipeline.",
"type": ["null", "integer"]
},
"default": {
"description": "Indicates if this pipeline is the default one in the system.",
"type": ["null", "boolean"]
}
}
}

View File

@@ -29,7 +29,6 @@ from source_hubspot.streams import (
ContactsMergedAudit,
ContactsWebAnalytics,
CustomObject,
DealPipelines,
Deals,
DealsArchived,
DealSplits,
@@ -74,6 +73,7 @@ DEFAULT_START_DATE = "2006-06-01T00:00:00Z"
scopes = {
"companies_property_history": {"crm.objects.companies.read"},
"contacts_property_history": {"crm.objects.contacts.read"},
"deal_pipelines": {"crm.objects.contacts.read"},
"deals_property_history": {"crm.objects.deals.read"},
"email_subscriptions": {"content"},
"marketing_emails": {"content"},
@@ -199,7 +199,6 @@ class SourceHubspot(YamlDeclarativeSource):
ContactsFormSubmissions(**common_params),
ContactsListMemberships(**common_params),
ContactsMergedAudit(**common_params),
DealPipelines(**common_params),
DealSplits(**common_params),
Deals(**common_params),
DealsArchived(**common_params),

View File

@@ -1578,20 +1578,6 @@ class DealsArchived(ClientSideIncrementalStream):
return params
class DealPipelines(ClientSideIncrementalStream):
"""Deal pipelines, API v1,
This endpoint requires the contacts scope the tickets scope.
Docs: https://legacydocs.hubspot.com/docs/methods/pipelines/get_pipelines_for_object_type
"""
url = "/crm-pipelines/v1/pipelines/deals"
updated_at_field = "updatedAt"
created_at_field = "createdAt"
cursor_field_datetime_format = "x"
primary_key = "pipelineId"
scopes = {"crm.objects.contacts.read"}
class DealSplits(CRMSearchStream):
"""Deal splits, API v3"""

View File

@@ -23,7 +23,7 @@ from .response_builder.streams import GenericResponseBuilder, HubspotStreamRespo
@freezegun.freeze_time("2024-03-03T14:42:00Z")
class HubspotTestCase:
DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
DT_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
OBJECT_ID = "testID"
ACCESS_TOKEN = "new_access_token"
CURSOR_FIELD = "occurredAt"
@@ -51,36 +51,46 @@ class HubspotTestCase:
@classmethod
def oauth_config(cls, start_date: Optional[str] = None) -> Dict[str, Any]:
start_date = start_date or cls.dt_str(cls.start_date())
return ConfigBuilder().with_start_date(start_date).with_auth(
{
"credentials_title": "OAuth Credentials",
"redirect_uri": "https://airbyte.io",
"client_id": "client_id",
"client_secret": "client_secret",
"refresh_token": "refresh_token",
}
).build()
return (
ConfigBuilder()
.with_start_date(start_date)
.with_auth(
{
"credentials_title": "OAuth Credentials",
"redirect_uri": "https://airbyte.io",
"client_id": "client_id",
"client_secret": "client_secret",
"refresh_token": "refresh_token",
}
)
.build()
)
@classmethod
def private_token_config(cls, token: str, start_date: Optional[str] = None) -> Dict[str, Any]:
start_date = start_date or cls.dt_str(cls.start_date())
return ConfigBuilder().with_start_date(start_date).with_auth(
{
"credentials_title": "Private App Credentials",
"access_token": token,
}
).build()
return (
ConfigBuilder()
.with_start_date(start_date)
.with_auth(
{
"credentials_title": "Private App Credentials",
"access_token": token,
}
)
.build()
)
@classmethod
def mock_oauth(cls, http_mocker: HttpMocker, token: str):
creds = cls.oauth_config()["credentials"]
req = OAuthRequestBuilder().with_client_id(
creds["client_id"]
).with_client_secret(
creds["client_secret"]
).with_refresh_token(
creds["refresh_token"]
).build()
req = (
OAuthRequestBuilder()
.with_client_id(creds["client_id"])
.with_client_secret(creds["client_secret"])
.with_refresh_token(creds["refresh_token"])
.build()
)
response = GenericResponseBuilder().with_value("access_token", token).with_value("expires_in", 7200).build()
http_mocker.post(req, response)
@@ -92,7 +102,7 @@ class HubspotTestCase:
def mock_custom_objects(cls, http_mocker: HttpMocker):
http_mocker.get(
CustomObjectsRequestBuilder().build(),
HttpResponseBuilder({}, records_path=FieldPath("results"), pagination_strategy=None).build()
HttpResponseBuilder({}, records_path=FieldPath("results"), pagination_strategy=None).build(),
)
@classmethod
@@ -105,10 +115,7 @@ class HubspotTestCase:
record = record_builder().with_field(FieldPath("name"), name).with_field(FieldPath("type"), type)
response_builder = response_builder.with_record(record)
http_mocker.get(
PropertiesRequestBuilder().for_entity(object_type).build(),
response_builder.build()
)
http_mocker.get(PropertiesRequestBuilder().for_entity(object_type).build(), response_builder.build())
@classmethod
def mock_response(cls, http_mocker: HttpMocker, request, responses, method: str = "get"):

View File

@@ -16,7 +16,6 @@ from source_hubspot.streams import (
ContactsMergedAudit,
ContactsWebAnalytics,
CustomObject,
DealPipelines,
Deals,
DealsArchived,
DealSplits,
@@ -96,7 +95,7 @@ def test_updated_at_field_non_exist_handler(requests_mock, common_params, fake_p
(ContactsMergedAudit, "contact", {"updatedAt": "2022-02-25T16:43:11Z"}),
(Deals, "deal", {"updatedAt": "2022-02-25T16:43:11Z"}),
(DealsArchived, "deal", {"archivedAt": "2022-02-25T16:43:11Z"}),
(DealPipelines, "deal", {"updatedAt": 1675121674226}),
("deal_pipelines", "deal", {"updatedAt": 1675121674226}),
(DealSplits, "deal_split", {"updatedAt": "2022-02-25T16:43:11Z"}),
(EmailEvents, "", {"updatedAt": "2022-02-25T16:43:11Z"}),
("email_subscriptions", "", {"updatedAt": "2022-02-25T16:43:11Z"}),