1
0
mirror of synced 2025-12-19 18:14:56 -05:00

chore: destination-hubspot to oss (#64144)

This commit is contained in:
Maxime Carbonneau-Leclerc
2025-07-31 14:51:23 -04:00
committed by GitHub
parent 0bffbca9ab
commit 93c12fa3e0
29 changed files with 2049 additions and 0 deletions

View File

@@ -0,0 +1 @@
# HubSpot Destination

View File

@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
plugins {
id("application")
id("airbyte-bulk-connector")
}
airbyteBulkConnector {
core = "load"
toolkits = listOf("load-csv", "load-dlq", "load-http", "load-low-code")
cdk = "local"
}
application {
mainClass = "io.airbyte.integrations.destination.hubspot.HubSpotDestination"
applicationDefaultJvmArgs = listOf(
"-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0",
// Uncomment to attach a live profiler.
// "-Djava.rmi.server.hostname=localhost",
// "-Dcom.sun.management.jmxremote=true",
// "-Dcom.sun.management.jmxremote.port=6000",
// "-Dcom.sun.management.jmxremote.rmi.port=6000",
// "-Dcom.sun.management.jmxremote.local.only=false",
// "-Dcom.sun.management.jmxremote.authenticate=false",
// "-Dcom.sun.management.jmxremote.ssl=false",
)
// Uncomment and replace to run locally
//applicationDefaultJvmArgs = listOf("-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "java.base/sun.security.action=ALL-UNNAMED", "--add-opens", "java.base/java.lang=ALL-UNNAMED")
}
val junitVersion = "5.11.4"
val testContainersVersion = "1.20.5"
configurations.configureEach {
// Exclude additional SLF4J providers from all classpaths
exclude(mapOf("group" to "org.slf4j", "module" to "slf4j-reload4j"))
}
// Uncomment to run locally
//tasks.run.configure {
// standardInput = System.`in`
//}
dependencies {
implementation("io.github.oshai:kotlin-logging-jvm:7.0.0")
implementation("jakarta.inject:jakarta.inject-api:2.0.1")
implementation("com.github.spotbugs:spotbugs-annotations:4.9.0")
implementation("io.micronaut:micronaut-inject:4.7.12")
implementation("org.apache.commons:commons-lang3:3.17.0")
// FIXME not sure why but these were not picked as transient dependencies so I copied/pasted them from load-http
implementation("com.squareup.okhttp3:okhttp:4.12.0")
implementation("dev.failsafe:failsafe-okhttp:3.3.2")
testImplementation("io.mockk:mockk:1.13.16")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter-params:$junitVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
integrationTestImplementation("org.testcontainers:mssqlserver:$testContainersVersion")
}
tasks.named<Test>("test") {
systemProperties(mapOf("mockk.junit.extension.keepmocks" to "true", "mockk.junit.extension.requireParallelTesting" to "true"))
}

View File

@@ -0,0 +1 @@
testExecutionConcurrency=-1

View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" width="250" height="250" fill="none"><path fill="#FF7A59" d="M178.194 90.764V65.867a19.172 19.172 0 0 0 11.054-17.281v-.571c0-10.59-8.584-19.173-19.172-19.173h-.572c-10.588 0-19.172 8.584-19.172 19.173v.57a19.166 19.166 0 0 0 11.054 17.282v24.897a54.29 54.29 0 0 0-25.815 11.366L67.29 48.945a21.41 21.41 0 0 0 .77-5.379 21.602 21.602 0 1 0-21.63 21.56 21.368 21.368 0 0 0 10.638-2.895l67.238 52.321c-12.362 18.674-12.031 43.011.833 61.343l-20.451 20.456a17.56 17.56 0 0 0-5.11-.832c-9.794.008-17.728 7.951-17.726 17.745.003 9.793 7.942 17.731 17.735 17.734 9.794.002 17.736-7.932 17.745-17.726a17.495 17.495 0 0 0-.834-5.11l20.231-20.238c18.076 13.915 42.903 15.114 62.237 3.005 19.333-12.11 29.09-34.972 24.457-57.308-4.632-22.338-22.675-39.434-45.229-42.858Zm-8.386 81.884a27.998 27.998 0 0 1-20.276-7.923 27.995 27.995 0 0 1-6.262-30.94 27.985 27.985 0 0 1 26.538-17.094c15.062.527 27.001 12.886 27.01 27.958.006 15.07-11.921 27.442-26.982 27.984"/></svg>

After

Width:  |  Height:  |  Size: 998 B

View File

@@ -0,0 +1,32 @@
data:
allowedHosts:
hosts:
- api.hubapi.com
registryOverrides:
cloud:
enabled: true
oss:
enabled: true
connectorBuildOptions:
baseImage: docker.io/airbyte/java-connector-base:2.0.1@sha256:ec89bd1a89e825514dd2fc8730ba299a3ae1544580a078df0e35c5202c2085b3
connectorSubtype: api
connectorType: destination
definitionId: c8ccd253-8525-4bbd-801c-f0b84ac71f61
dockerImageTag: 0.0.4
dockerRepository: airbyte/destination-hubspot
githubIssueLabel: destination-hubspot
icon: hubspot.svg
license: MIT
name: HubSpot
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/hubspot
tags:
- language:java
ab_internal:
sl: 100
ql: 100
isEnterprise: false
supportLevel: certified
supportsDataActivation: true
supportsRefreshes: true # needed for the CDK to work as it relies on the generation/sync ids to work
metadataSpecVersion: "1.0"

View File

@@ -0,0 +1,3 @@
include = [
"${POE_GIT_DIR}/poe-tasks/gradle-connector-tasks.toml",
]

View File

@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.check.dlq.DlqChecker
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.http.HttpClient
import io.airbyte.cdk.load.http.authentication.OAuthAuthenticator
import io.airbyte.cdk.load.http.okhttp.AirbyteOkHttpClient
import io.airbyte.cdk.load.lowcode.DeclarativeDestinationFactory
import io.airbyte.cdk.load.pipeline.LoadPipeline
import io.airbyte.cdk.load.write.dlq.DlqPipelineFactory
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
import io.airbyte.integrations.destination.hubspot.http.HubSpotOperationRepository
import io.airbyte.integrations.destination.hubspot.io.airbyte.integrations.destination.hubspot.http.HubSpotObjectTypeIdMapper
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
import okhttp3.OkHttpClient
@Factory
class HubSpotBeanFactory {
@Singleton
fun check(
factory: DeclarativeDestinationFactory<HubSpotConfiguration>,
checker: DlqChecker,
) = factory.createDestinationChecker(checker)
@Singleton
fun factory(config: HubSpotConfiguration): DeclarativeDestinationFactory<HubSpotConfiguration> =
DeclarativeDestinationFactory(config)
@Singleton
fun discover(httpClient: HttpClient): HubSpotDiscoverer {
return HubSpotDiscoverer(HubSpotOperationRepository(httpClient))
}
@Singleton fun getConfig(config: DestinationConfiguration) = config as HubSpotConfiguration
@Singleton
fun getAuthenticator(config: HubSpotConfiguration): OAuthAuthenticator {
when (config.credentials.type) {
"OAuth" -> {
val credentials = config.credentials as OAuthCredentialsConfig
return OAuthAuthenticator(
"https://api.hubapi.com/oauth/v1/token",
credentials.clientId,
credentials.clientSecret,
credentials.refreshToken
)
}
else ->
throw IllegalArgumentException(
"Unsupported authenticator type: ${config.credentials.type}"
)
}
}
@Singleton
fun getHttpClient(authenticator: OAuthAuthenticator): HttpClient {
val okhttpClient: OkHttpClient =
OkHttpClient.Builder().addInterceptor(authenticator).build()
return AirbyteOkHttpClient(okhttpClient)
}
@Singleton
fun objectLoader(): ObjectLoader =
object : ObjectLoader {
override val inputPartitions = 1
override val numPartWorkers = 1
}
@Singleton
fun loadPipeline(
catalog: DestinationCatalog,
httpClient: HttpClient,
dlqPipelineFactory: DlqPipelineFactory,
): LoadPipeline =
dlqPipelineFactory.createPipeline(
HubSpotLoader(httpClient, HubSpotObjectTypeIdMapper(httpClient), catalog)
)
@Singleton fun writer() = HubSpotWriter()
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.dlq.ObjectStorageConfig
import io.airbyte.cdk.load.command.dlq.ObjectStorageConfigProvider
sealed interface CredentialsConfig {
val type: String
}
class OAuthCredentialsConfig(
val clientId: String,
val clientSecret: String,
val refreshToken: String
) : CredentialsConfig {
override val type: String = "OAuth"
}
data class HubSpotConfiguration(
val credentials: CredentialsConfig,
override val objectStorageConfig: ObjectStorageConfig,
) : DestinationConfiguration(), ObjectStorageConfigProvider

View File

@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.command.DestinationConfigurationFactory
import io.airbyte.cdk.load.command.dlq.toObjectStorageConfig
import jakarta.inject.Singleton
@Singleton
class HubSpotConfigurationFactory :
DestinationConfigurationFactory<HubSpotSpecification, HubSpotConfiguration> {
override fun makeWithoutExceptionHandling(pojo: HubSpotSpecification): HubSpotConfiguration {
val oauthCredentials = pojo.credentials as OAuthCredentialsSpec
return HubSpotConfiguration(
credentials =
OAuthCredentialsConfig(
oauthCredentials.clientId,
oauthCredentials.clientSecret,
oauthCredentials.refreshToken
),
objectStorageConfig = pojo.objectStorageConfig.toObjectStorageConfig(),
)
}
}

View File

@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.AirbyteDestinationRunner
import io.airbyte.cdk.load.command.aws.AwsToolkitConstants
object HubSpotDestination {
val additionalMicronautEnvs = listOf(AwsToolkitConstants.MICRONAUT_ENVIRONMENT)
@JvmStatic
fun main(args: Array<String>) {
AirbyteDestinationRunner.run(*args, additionalMicronautEnvs = additionalMicronautEnvs)
}
}

View File

@@ -0,0 +1,16 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.command.DestinationDiscoverCatalog
import io.airbyte.cdk.load.discover.DestinationDiscoverer
import io.airbyte.integrations.destination.hubspot.http.HubSpotOperationRepository
class HubSpotDiscoverer(private val operationRepository: HubSpotOperationRepository) :
DestinationDiscoverer<HubSpotConfiguration> {
override fun discover(config: HubSpotConfiguration): DestinationDiscoverCatalog {
return DestinationDiscoverCatalog(operationRepository.fetchAll())
}
}

View File

@@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.discoverer.operation.extract
import io.airbyte.cdk.load.http.HttpClient
import io.airbyte.cdk.load.http.Request
import io.airbyte.cdk.load.http.RequestMethod
import io.airbyte.cdk.load.http.Response
import io.airbyte.cdk.load.http.decoder.JsonDecoder
import io.airbyte.cdk.load.http.getBodyOrEmpty
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.StreamKey
import io.airbyte.cdk.load.util.serializeToJsonBytes
import io.airbyte.cdk.load.write.dlq.DlqLoader
import io.airbyte.cdk.util.Jsons
import io.airbyte.integrations.destination.hubspot.io.airbyte.integrations.destination.hubspot.http.HubSpotObjectTypeIdMapper
import io.github.oshai.kotlinlogging.KotlinLogging
private val logger = KotlinLogging.logger {}
class HubSpotState(
private val httpClient: HttpClient,
private val objectDao: HubSpotObjectTypeIdMapper,
private val stream: DestinationStream
) : AutoCloseable {
val requestBody: ObjectNode = Jsons.objectNode()
val batch: ArrayNode = requestBody.putArray("inputs")
val decoder: JsonDecoder = JsonDecoder()
fun accumulate(record: DestinationRecordRaw) {
if (isFull()) {
throw IllegalStateException("Can't add records as the batch is already full")
}
val data: JsonNode = record.asJsonRecord()
val input =
Jsons.objectNode().put("idProperty", getNonNestedMatchingKey()).apply {
this.replace(
"id",
data.extract(
stream.matchingKey ?: throw IllegalStateException("Missing matching key")
)
)
val properties = this.putObject("properties")
data.fields().forEach { (key, value) -> properties.replace(key, value) }
}
batch.add(input)
}
fun isFull(): Boolean = batch.size() >= 100
fun flush(): List<DestinationRecordRaw>? {
logger.info { "Flushing data" }
val response: Response =
httpClient.send(
Request(
method = RequestMethod.POST,
// Note that knowing all the standard object names could improve performance of
// one HTTP request in the case the user does not sync custom objects so this
// could be optimized.
url =
"https://api.hubapi.com/crm/v3/objects/${objectDao.fetchObjectTypeId(stream.destinationObjectName ?: throw IllegalStateException("destinationObjectName required"))}/batch/upsert",
headers = mapOf("Content-Type" to "application/json"),
body = requestBody.serializeToJsonBytes()
)
)
response.use {
return when (response.statusCode) {
200 -> null
207 -> null // FIXME generate dlq record with error from hubspot
else ->
throw IllegalStateException(
"Invalid response with status code ${response.statusCode} while starting ingestion: ${response.getBodyOrEmpty().reader(Charsets.UTF_8).readText()}"
)
}
}
}
override fun close() {}
private fun getNonNestedMatchingKey(): String {
val matchingKey = stream.matchingKey ?: emptyList()
if (matchingKey.isEmpty()) {
throw IllegalStateException(
"In order to perform upserts, a matching key needs to be provided"
)
}
if (matchingKey.size != 1) {
throw IllegalStateException(
"Matching keys for Salesforce need to have only one field but got $matchingKey"
)
}
return matchingKey.get(0)
}
}
class HubSpotLoader(
private val httpClient: HttpClient,
private val objectDao: HubSpotObjectTypeIdMapper,
private val catalog: DestinationCatalog
) : DlqLoader<HubSpotState> {
override fun start(key: StreamKey, part: Int): HubSpotState {
return HubSpotState(
httpClient,
objectDao,
catalog.streams.find { it.mappedDescriptor == key.stream }
?: throw IllegalStateException(
"Could not find stream ${key.stream} as part of the catalog."
),
)
}
override fun accept(
record: DestinationRecordRaw,
state: HubSpotState
): DlqLoader.DlqLoadResult {
state.accumulate(record)
if (state.isFull()) {
val failedRecords = state.flush()
return DlqLoader.Complete(failedRecords)
} else {
return DlqLoader.Incomplete
}
}
override fun finish(state: HubSpotState): DlqLoader.Complete = DlqLoader.Complete(state.flush())
override fun close() {}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.command.dlq.ConfigurationSpecificationWithDlq
import io.airbyte.cdk.load.spec.DestinationSpecificationExtension
import io.airbyte.protocol.models.v0.DestinationSyncMode
import jakarta.inject.Singleton
enum class CredentialsType(@get:JsonValue val type: String) {
OAuth("OAuth Credentials"),
}
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type"
)
@JsonSubTypes(
JsonSubTypes.Type(value = OAuthCredentialsSpec::class, name = "OAuth"),
)
sealed interface CredentialsSpec {
@get:JsonSchemaTitle("Credentials")
@get:JsonProperty("type")
val credentialsType: CredentialsType
}
class OAuthCredentialsSpec : CredentialsSpec {
override val credentialsType = CredentialsType.OAuth
@get:JsonSchemaTitle("Client ID")
@get:JsonPropertyDescription(
"The Client ID of your HubSpot developer application. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this ID.",
)
@get:JsonProperty("client_id")
@get:JsonSchemaInject(json = """{"order": 0, "airbyte_secret": true}""")
val clientId: String = ""
@get:JsonSchemaTitle("Client Secret")
@get:JsonPropertyDescription(
"The client secret for your HubSpot developer application. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this secret.",
)
@get:JsonProperty("client_secret")
@get:JsonSchemaInject(json = """{"order": 1, "airbyte_secret": true}""")
val clientSecret: String = ""
@get:JsonSchemaTitle("Refresh Token")
@get:JsonPropertyDescription(
"Refresh token to renew an expired access token. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this token.",
)
@get:JsonProperty("refresh_token")
@get:JsonSchemaInject(json = """{"order": 2, "airbyte_secret": true}""")
val refreshToken: String = ""
}
@Singleton
class HubSpotSpecification : ConfigurationSpecificationWithDlq() {
@get:JsonSchemaTitle("Credentials")
@get:JsonPropertyDescription("""Choose how to authenticate to HubSpot.""")
@get:JsonProperty("credentials")
@get:JsonSchemaInject(json = """{"order": 0}""")
val credentials: CredentialsSpec = OAuthCredentialsSpec()
}
@Singleton
class HubSpotSpecificationExtension : DestinationSpecificationExtension {
override val supportedSyncModes =
listOf(
DestinationSyncMode.APPEND,
)
override val supportsIncremental = true
}

View File

@@ -0,0 +1,116 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.spec.DestinationSpecificationExtender
import io.airbyte.cdk.spec.SpecificationExtender
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AdvancedAuth
import io.airbyte.protocol.models.v0.AdvancedAuth.AuthFlowType
import io.airbyte.protocol.models.v0.ConnectorSpecification
import io.airbyte.protocol.models.v0.OAuthConfigSpecification
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
private const val oauthConnectorInputSpecification: String =
"""{
"consent_url": "https://app.hubspot.com/oauth/authorize?{{ client_id_key }}={{ client_id_value }}&{{ redirect_uri_key }}={{ redirect_uri_value | urlencode }}&{{ scope_key }}={{ scope_value | urlencode }}&optional_scope={{ optional_scope | urlencode }}&{{ state_key }}={{ state_value }}&code_challenge={{ state_value | codechallengeS256 }}",
"scope": "crm.schemas.contacts.read crm.schemas.companies.read crm.schemas.deals.read crm.schemas.custom.read",
"optional_scope": "crm.objects.contacts.write crm.objects.companies.write crm.objects.deals.write crm.objects.custom.write",
"access_token_url": "https://api.hubapi.com/oauth/v1/token",
"extract_output": ["access_token", "refresh_token", "expires_in"],
"access_token_headers": {
"Content-Type": "application/x-www-form-urlencoded"
},
"access_token_params": {
"client_id": "{{ client_id_value }}",
"client_secret": "{{ client_secret_value }}",
"code": "{{ auth_code_value }}",
"grant_type": "authorization_code",
"redirect_uri": "{{ redirect_uri_value }}"
}
}"""
private const val completeOauthOutputSpecification: String =
"""{
"type": "object",
"additionalProperties": false,
"properties": {
"refresh_token": {
"type": "string",
"path_in_connector_config": [
"credentials",
"refresh_token"
],
"path_in_oauth_response": ["refresh_token"]
}
}
}
"""
private const val completeOauthServerInputSpecification: String =
"""{
"type": "object",
"additionalProperties": false,
"properties": {
"client_id": {
"type": "string"
},
"client_secret": {
"type": "string"
}
}
}"""
private const val completeOauthServerOutputSpecification: String =
"""{
"type": "object",
"additionalProperties": false,
"properties": {
"client_id": {
"type": "string",
"path_in_connector_config": [
"credentials",
"client_id"
]
},
"client_secret": {
"type": "string",
"path_in_connector_config": [
"credentials",
"client_secret"
]
}
}
}"""
@Singleton
@Primary
class HubSpotSpecificationExtender(private val decorated: DestinationSpecificationExtender) :
SpecificationExtender {
override fun invoke(specification: ConnectorSpecification): ConnectorSpecification {
val advancedAuth =
AdvancedAuth()
.withAuthFlowType(AuthFlowType.OAUTH_2_0)
.withPredicateKey(listOf("credentials", "type"))
.withPredicateValue("OAuth")
.withOauthConfigSpecification(
OAuthConfigSpecification()
.withOauthConnectorInputSpecification(
Jsons.readTree(oauthConnectorInputSpecification)
)
.withCompleteOauthOutputSpecification(
Jsons.readTree(completeOauthOutputSpecification)
)
.withCompleteOauthServerInputSpecification(
Jsons.readTree(completeOauthServerInputSpecification)
)
.withCompleteOauthServerOutputSpecification(
Jsons.readTree(completeOauthServerOutputSpecification)
)
)
return decorated.invoke(specification).withAdvancedAuth(advancedAuth)
}
}

View File

@@ -0,0 +1,10 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.write.StreamLoader
class HubSpotStreamLoader(override val stream: DestinationStream) : StreamLoader {}

View File

@@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
class HubSpotWriter : DestinationWriter {
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return HubSpotStreamLoader(stream)
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot.io.airbyte.integrations.destination.hubspot.http
import io.airbyte.cdk.load.http.HttpClient
import io.airbyte.cdk.load.http.Request
import io.airbyte.cdk.load.http.RequestMethod
import io.airbyte.cdk.load.http.consumeBodyToString
import io.airbyte.cdk.load.http.decoder.JsonDecoder
import io.airbyte.cdk.load.http.getBodyOrEmpty
/**
* To query the HubSpot API for batch on custom objects, we need the ObjectTypeId as the object name
* does not work like standard objects. This class loads once a mapping of (objectName,
* objectTypeId) for all the customer objects.
*
* Note that this does not fit in the future low-code pattern. In order to solve that, we have two
* potential solutions:
* * Create a low-code component that could fetch additional data during the write command;
* * Assuming this data is always available during the discover command, populate this as part of
* the catalog. This would require a protocol change to have this information flow from the discover
* to the write. It seems like this should be the preferable solution as it's easy to populate a new
* field in the catalog and it wouldn't require re-specifying the query to get the information in
* the write command in low-code.
*/
class HubSpotObjectTypeIdMapper(private val httpClient: HttpClient) {
private val decoder: JsonDecoder = JsonDecoder()
// This assumes there can't be twice the same object name for different object type ids.
// I assume this is not possible because when trying to do so through the UI, I get `An object
// with the singular label <object name> already exists.` and the `Create` button is greyed out.
private var objectTypeIdByObjectName: Map<String, String>? = null
/**
* Note that even if a standard object is passed, the cache will be built. We could optimize
* that by maintaining a list of standard objects here.
*/
fun fetchObjectTypeId(objectName: String): String {
if (objectTypeIdByObjectName == null) {
buildCache()
}
return objectTypeIdByObjectName!![objectName] ?: objectName
}
private fun buildCache() {
httpClient.send(Request(RequestMethod.GET, "https://api.hubapi.com/crm/v3/schemas")).use {
when (it.statusCode) {
200 ->
objectTypeIdByObjectName =
decoder
.decode(it.getBodyOrEmpty())
.get("results")
.asSequence()
.map { it.get("name").asText() to it.get("objectTypeId").asText() }
.toMap()
else -> {
throw IllegalStateException(
"Failed to get the object types from HubSpot API. HTTP response had status ${it.statusCode} and message is: ${it.consumeBodyToString()}",
)
}
}
}
}
}

View File

@@ -0,0 +1,148 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot.http
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationOperation
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.discoverer.destinationobject.DynamicDestinationObjectProvider
import io.airbyte.cdk.load.discoverer.destinationobject.StaticDestinationObjectProvider
import io.airbyte.cdk.load.discoverer.operation.CompositeOperationProvider
import io.airbyte.cdk.load.discoverer.operation.DestinationOperationAssembler
import io.airbyte.cdk.load.discoverer.operation.DynamicOperationProvider
import io.airbyte.cdk.load.discoverer.operation.InsertionMethod
import io.airbyte.cdk.load.discoverer.operation.JsonNodePredicate
import io.airbyte.cdk.load.discoverer.operation.OperationProvider
import io.airbyte.cdk.load.http.HttpClient
import io.airbyte.cdk.load.http.HttpRequester
import io.airbyte.cdk.load.http.RequestMethod
import io.airbyte.cdk.load.http.Retriever
import java.util.function.Predicate
import kotlin.collections.List
/*
HubSpot process in a way that fetching schemas for standard objects is different from custom ones. This community member documented a list of standard objects: https://community.hubspot.com/t5/APIs-Integrations/Object-Schemas-GET-All-Custom-and-Standard/m-p/881573/highlight/true#M69167
*/
class HubSpotOperationRepository(
httpClient: HttpClient,
) {
private val operationProvider: OperationProvider =
CompositeOperationProvider(
listOf(
DynamicOperationProvider(
objectsSupplier = StaticDestinationObjectProvider(listOf("CONTACT")),
operationAssembler =
DestinationOperationAssembler(
propertiesPath = PROPERTIES_PATH,
insertionMethods =
listOf(
InsertionMethod(
importType = upsertOperation(),
namePath = PROPERTY_NAME_PATH,
typePath = PROPERTY_TYPE_PATH,
matchingKeyPredicate =
JsonNodePredicate(
"""{{ property["name"] == "email" }}"""
),
availabilityPredicate = UPSERT_AVAILABILITY_PREDICATE,
requiredPredicate = NEVER_REQUIRED_PREDICATE,
typeMapper = TYPE_MAPPER,
),
),
schemaRequester =
HttpRequester(
httpClient,
RequestMethod.GET,
STANDARD_OBJECT_SCHEMA_URL
),
)
),
DynamicOperationProvider(
objectsSupplier = StaticDestinationObjectProvider(listOf("COMPANY", "DEAL")),
operationAssembler =
DestinationOperationAssembler(
propertiesPath = PROPERTIES_PATH,
insertionMethods = listOf(UPSERT_UNIQUE_VALUE_INSERTION_METHOD),
schemaRequester =
HttpRequester(
httpClient,
RequestMethod.GET,
STANDARD_OBJECT_SCHEMA_URL
),
)
),
DynamicOperationProvider(
objectsSupplier =
DynamicDestinationObjectProvider(
retriever =
Retriever(
requester =
HttpRequester(
httpClient,
RequestMethod.GET,
"https://api.hubapi.com/crm/v3/schemas"
),
selector = listOf("results"),
),
namePath = OBJECT_NAME_PATH,
),
operationAssembler =
DestinationOperationAssembler(
propertiesPath = PROPERTIES_PATH,
insertionMethods = listOf(UPSERT_UNIQUE_VALUE_INSERTION_METHOD),
schemaRequester = null,
),
),
)
)
companion object {
val TYPE_MAPPER =
mapOf(
"string" to StringType,
"enumeration" to StringType,
"phone_number" to StringType,
"number" to NumberType,
"bool" to BooleanType,
"date" to DateType,
"datetime" to TimestampTypeWithTimezone,
)
val OBJECT_NAME_PATH = listOf("name")
val PROPERTIES_PATH = listOf("properties")
val PROPERTY_NAME_PATH = listOf("name")
val PROPERTY_TYPE_PATH = listOf("type")
val UPSERT_AVAILABILITY_PREDICATE =
JsonNodePredicate(
"""{{ property["type"] != "object_coordinates" && property["modificationMetadata"]["readOnlyValue"] == false && property["calculated"] == false }}"""
)
val NEVER_REQUIRED_PREDICATE: Predicate<JsonNode> = Predicate { _ -> false }
const val STANDARD_OBJECT_SCHEMA_URL =
"""https://api.hubapi.com/crm/v3/schemas/{{ object["name"] }}"""
val UPSERT_UNIQUE_VALUE_INSERTION_METHOD =
InsertionMethod(
importType = upsertOperation(),
namePath = PROPERTY_NAME_PATH,
typePath = PROPERTY_TYPE_PATH,
matchingKeyPredicate =
JsonNodePredicate(
"""{{ property["hasUniqueValue"] && property["modificationMetadata"]["readOnlyValue"] == false }}"""
),
availabilityPredicate = UPSERT_AVAILABILITY_PREDICATE,
requiredPredicate = NEVER_REQUIRED_PREDICATE,
typeMapper = TYPE_MAPPER,
)
private fun upsertOperation(): Dedupe = Dedupe(emptyList(), emptyList())
}
fun fetchAll(): List<DestinationOperation> {
return operationProvider.get()
}
}

View File

@@ -0,0 +1,12 @@
checker:
type: HttpRequestChecker
requester:
type: HttpRequester
url: https://api.hubapi.com/crm/v3/schemas/contact
method: GET
authenticator:
type: OAuthAuthenticator
url: https://api.hubapi.com/oauth/v1/token
client_id: "{{ config.credentials.clientId }}"
client_secret: "{{ config.credentials.clientSecret }}"
refresh_token: "{{ config.credentials.refreshToken }}"

View File

@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.hubspot.HubSpotSpecification
import java.nio.file.Files
import java.nio.file.Path
import java.util.regex.Pattern
class HubSpotCheckTest :
CheckIntegrationTest<HubSpotSpecification>(
successConfigFilenames =
listOf(
CheckTestConfig(
configContents = Files.readString(Path.of("secrets/config.json")),
)
),
failConfigFilenamesAndFailureReasons =
mapOf(
CheckTestConfig(
configContents = Files.readString(Path.of("secrets/invalid-config.json"))
) to Pattern.compile("Response from server is.*"),
),
)

View File

@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
import io.airbyte.cdk.load.discover.DiscoverIntegrationTest
import io.airbyte.cdk.load.discover.DiscoverTestConfig
import io.airbyte.integrations.destination.hubspot.HubSpotSpecification
import java.nio.file.Files
import java.nio.file.Path
class HubSpotDiscoverTest :
DiscoverIntegrationTest<HubSpotSpecification>(
successConfigFilenames =
listOf(
DiscoverTestConfig(
configContents = Files.readString(Path.of("secrets/config.json")),
)
),
)

View File

@@ -0,0 +1,7 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
import io.airbyte.cdk.load.spec.SpecTest
class HubSpotSpecTest : SpecTest()

View File

@@ -0,0 +1,227 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.command.Update
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.message.InputStreamCheckpoint
import io.airbyte.cdk.load.message.Meta.Change
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.Untyped
import io.airbyte.integrations.destination.hubspot.HubSpotSpecification
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.nio.file.Files
import java.nio.file.Path
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
class HubSpotDataDumper : DestinationDataDumper {
override fun dumpRecords(
spec: ConfigurationSpecification,
stream: DestinationStream
): List<OutputRecord> {
TODO("Not yet implemented")
}
override fun dumpFile(
spec: ConfigurationSpecification,
stream: DestinationStream
): Map<String, String> {
TODO("Not yet implemented")
}
}
object HubSpotDataCleaner : DestinationCleaner {
override fun cleanup() {}
}
class HubSpotWriterTest() :
BasicFunctionalityIntegrationTest(
configContents = Files.readString(Path.of("secrets/config.json")),
configSpecClass = HubSpotSpecification::class.java,
dataDumper = HubSpotDataDumper(),
destinationCleaner = HubSpotDataCleaner,
commitDataIncrementally = true,
allTypesBehavior = Untyped,
verifyDataWriting = false,
isStreamSchemaRetroactive = false,
dedupBehavior = null,
stringifySchemalessObjects = true,
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRINGIFY,
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRINGIFY,
unionBehavior = UnionBehavior.STRINGIFY,
supportFileTransfer = false,
) {
private val contactStream: DestinationStream =
DestinationStream(
randomizedNamespace,
"test_stream_contact",
Update,
ObjectType(
linkedMapOf(
"email" to FieldType(StringType, nullable = true),
"hs_enriched_email_bounce_detected" to FieldType(BooleanType, nullable = true)
)
),
generationId = 0,
minimumGenerationId = 0,
syncId = 42,
destinationObjectName = "CONTACT",
namespaceMapper = NamespaceMapper(),
matchingKey = listOf("email"),
)
private val companyStream: DestinationStream =
DestinationStream(
randomizedNamespace,
"test_stream_companies",
Update,
ObjectType(
linkedMapOf(
"retl_identifier" to FieldType(StringType, nullable = true),
"about_us" to FieldType(StringType, nullable = true)
)
),
generationId = 0,
minimumGenerationId = 0,
syncId = 42,
destinationObjectName = "COMPANY",
namespaceMapper = NamespaceMapper(),
matchingKey = listOf("retl_identifier"),
)
private val carsStream: DestinationStream =
DestinationStream(
randomizedNamespace,
"test_stream_cars",
Update,
ObjectType(
linkedMapOf(
"car_id" to FieldType(StringType, nullable = true),
"hs_object_source_detail_1" to FieldType(NumberType, nullable = true)
)
),
generationId = 0,
minimumGenerationId = 0,
syncId = 42,
destinationObjectName = "cars",
namespaceMapper = NamespaceMapper(),
matchingKey = listOf("car_id"),
)
fun contactRecord(email: String) =
InputRecord(
stream = contactStream,
data = """{"email": "$email", "hs_clicked_linkedin_ad": "false"}""",
emittedAtMs = 1234,
)
fun companyRecord(retlIdentifier: String) =
InputRecord(
stream = companyStream,
data =
"""{"retl_identifier": "$retlIdentifier", "about_us": "This about_us has been generated by integration tests"}""",
emittedAtMs = 1234,
)
fun carsRecord(carId: Int) =
InputRecord(
stream = carsStream,
data = """{"car_id": $carId, "hubspot_owner_id": "52550153"}""",
emittedAtMs = 1234,
)
@Test
override fun testBasicWrite() {
val messages =
runSync(
updatedConfig,
DestinationCatalog(listOf(contactStream, companyStream, carsStream)),
listOf(
contactRecord("mcl.retl.test@airbyte-x.com"),
companyRecord("retl_id"),
carsRecord(1000),
InputStreamCheckpoint(
unmappedNamespace = contactStream.mappedDescriptor.namespace,
unmappedName = contactStream.mappedDescriptor.name,
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
),
InputStreamCheckpoint(
unmappedNamespace = companyStream.mappedDescriptor.namespace,
unmappedName = companyStream.mappedDescriptor.name,
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
),
InputStreamCheckpoint(
unmappedNamespace = carsStream.mappedDescriptor.namespace,
unmappedName = carsStream.mappedDescriptor.name,
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
)
),
)
val stateMessages = messages.filter { it.type == AirbyteMessage.Type.STATE }
assertAll(
{
assertEquals(
3,
stateMessages.size,
"Expected to receive exactly three state messages (one for each stream), got ${stateMessages.size} ($stateMessages)"
)
},
{
if (verifyDataWriting) {
dumpAndDiffRecords(
ValidatedJsonUtils.parseOne(configSpecClass, updatedConfig),
listOf(
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 5678),
airbyteMeta =
OutputRecord.Meta(
changes =
mutableListOf(
Change(
field = "foo",
change =
AirbyteRecordMessageMetaChange.Change
.NULLED,
reason =
AirbyteRecordMessageMetaChange.Reason
.SOURCE_FIELD_SIZE_LIMITATION
)
),
syncId = 42
)
)
),
contactStream,
primaryKey = listOf(listOf("email")),
cursor = null,
)
}
},
)
}
}

View File

@@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package bean
import io.airbyte.cdk.load.MockObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
@Factory
class HubSpotTestingOverrideFactory {
@Singleton fun objectClient(): ObjectStorageClient<*> = MockObjectStorageClient()
}

View File

@@ -0,0 +1,292 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/hubspot",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "Hub Spot Specification",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"object_storage_config" : {
"oneOf" : [ {
"type" : "object",
"additionalProperties" : true,
"properties" : {
"storage_type" : {
"type" : "string",
"enum" : [ "None" ],
"default" : "None"
}
},
"title" : "None",
"required" : [ "storage_type" ]
}, {
"type" : "object",
"additionalProperties" : true,
"properties" : {
"storage_type" : {
"type" : "string",
"enum" : [ "S3" ],
"default" : "S3"
},
"access_key_id" : {
"type" : "string",
"description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>.",
"title" : "Access Key ID",
"examples" : [ "A012345678910EXAMPLE" ],
"airbyte_secret" : true,
"always_show" : true,
"order" : 1
},
"secret_access_key" : {
"type" : "string",
"description" : "The corresponding secret to the access key ID. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>",
"title" : "Secret Access Key",
"examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ],
"airbyte_secret" : true,
"always_show" : true,
"order" : 2
},
"role_arn" : {
"type" : "string",
"description" : "The ARN of the AWS role to assume. Only usable in Airbyte Cloud.",
"title" : "Role ARN",
"examples" : [ "arn:aws:iam::123456789:role/ExternalIdIsYourWorkspaceId" ],
"order" : 3
},
"s3_bucket_name" : {
"type" : "string",
"description" : "The name of the S3 bucket. Read more <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html\">here</a>.",
"title" : "S3 Bucket Name",
"examples" : [ "airbyte_sync" ],
"order" : 4
},
"s3_bucket_region" : {
"type" : "string",
"enum" : [ "", "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ],
"description" : "The region of the S3 bucket. See <a href=\"https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions\">here</a> for all region codes.",
"title" : "S3 Bucket Region",
"examples" : [ "us-east-1" ],
"order" : 5,
"default" : ""
},
"s3_endpoint" : {
"type" : "string",
"description" : "Your S3 endpoint url. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/s3.html#:~:text=Service%20endpoints-,Amazon%20S3%20endpoints,-When%20you%20use\">here</a>",
"title" : "S3 Endpoint",
"examples" : [ "http://localhost:9000" ],
"order" : 6
},
"bucket_path" : {
"type" : "string",
"description" : "All files in the bucket will be prefixed by this.",
"title" : "Prefix Path in the Bucket",
"examples" : [ "prefix/" ],
"order" : 7
},
"format" : {
"oneOf" : [ {
"title" : "CSV: Comma-Separated Values",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "CSV" ],
"default" : "CSV"
},
"flattening" : {
"type" : "string",
"default" : "No flattening",
"enum" : [ "No flattening", "Root level flattening" ],
"title" : "Flattening"
}
},
"required" : [ "format_type", "flattening" ]
}, {
"title" : "JSON Lines: Newline-delimited JSON",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "JSONL" ],
"default" : "JSONL"
},
"flattening" : {
"type" : "string",
"default" : "No flattening",
"enum" : [ "No flattening", "Root level flattening" ],
"title" : "Flattening"
}
},
"required" : [ "format_type" ]
} ],
"description" : "Format of the data output.",
"title" : "Output Format",
"examples" : [ "CSV", "JSONL" ],
"default" : "CSV",
"airbyte_hidden" : true,
"order" : 8,
"type" : "object"
},
"compression" : {
"oneOf" : [ {
"title" : "No Compression",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"compression_type" : {
"type" : "string",
"enum" : [ "No Compression" ],
"default" : "No Compression"
}
},
"required" : [ "compression_type" ]
}, {
"title" : "GZIP",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"compression_type" : {
"type" : "string",
"enum" : [ "GZIP" ],
"default" : "GZIP"
}
},
"required" : [ "compression_type" ]
} ],
"description" : "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").",
"title" : "Compression",
"airbyte_hidden" : true,
"type" : "object"
},
"path_format" : {
"type" : "string",
"examples" : [ "{namespace}/{stream_name}/{year}_{month}_{day}_{epoch}" ],
"default" : "{sync_id}/{namespace}/{stream_name}/",
"airbyte_hidden" : true,
"order" : 10
},
"file_name_format" : {
"type" : "string",
"examples" : [ "{date}", "{date:yyyy_MM}", "{timestamp}", "{part_number}", "{sync_id}" ],
"default" : "{date}_{timestamp}_{part_number}{format_extension}",
"airbyte_hidden" : true,
"order" : 11
}
},
"title" : "S3",
"required" : [ "storage_type", "s3_bucket_name", "s3_bucket_region", "bucket_path", "format" ]
} ],
"title" : "Object Storage Configuration",
"type" : "object"
},
"credentials" : {
"oneOf" : [ {
"type" : "object",
"additionalProperties" : true,
"properties" : {
"type" : {
"type" : "string",
"enum" : [ "OAuth" ],
"default" : "OAuth"
},
"client_id" : {
"type" : "string",
"description" : "The Client ID of your HubSpot developer application. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this ID.",
"title" : "Client ID",
"order" : 0,
"airbyte_secret" : true
},
"client_secret" : {
"type" : "string",
"description" : "The client secret for your HubSpot developer application. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this secret.",
"title" : "Client Secret",
"order" : 1,
"airbyte_secret" : true
},
"refresh_token" : {
"type" : "string",
"description" : "Refresh token to renew an expired access token. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this token.",
"title" : "Refresh Token",
"order" : 2,
"airbyte_secret" : true
}
},
"title" : "OAuth",
"required" : [ "type", "client_id", "client_secret", "refresh_token" ]
} ],
"description" : "Choose how to authenticate to HubSpot.",
"title" : "Credentials",
"order" : 0,
"type" : "object"
}
},
"required" : [ "credentials" ]
},
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "append" ],
"advanced_auth" : {
"auth_flow_type" : "oauth2.0",
"predicate_key" : [ "credentials", "type" ],
"predicate_value" : "OAuth",
"oauth_config_specification" : {
"oauth_connector_input_specification" : {
"consent_url" : "https://app.hubspot.com/oauth/authorize?{{ client_id_key }}={{ client_id_value }}&{{ redirect_uri_key }}={{ redirect_uri_value | urlencode }}&{{ scope_key }}={{ scope_value | urlencode }}&optional_scope={{ optional_scope | urlencode }}&{{ state_key }}={{ state_value }}&code_challenge={{ state_value | codechallengeS256 }}",
"scope" : "crm.schemas.contacts.read crm.schemas.companies.read crm.schemas.deals.read crm.schemas.custom.read",
"optional_scope" : "crm.objects.contacts.write crm.objects.companies.write crm.objects.deals.write crm.objects.custom.write",
"access_token_url" : "https://api.hubapi.com/oauth/v1/token",
"extract_output" : [ "access_token", "refresh_token", "expires_in" ],
"access_token_headers" : {
"Content-Type" : "application/x-www-form-urlencoded"
},
"access_token_params" : {
"client_id" : "{{ client_id_value }}",
"client_secret" : "{{ client_secret_value }}",
"code" : "{{ auth_code_value }}",
"grant_type" : "authorization_code",
"redirect_uri" : "{{ redirect_uri_value }}"
}
},
"complete_oauth_output_specification" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"refresh_token" : {
"type" : "string",
"path_in_connector_config" : [ "credentials", "refresh_token" ],
"path_in_oauth_response" : [ "refresh_token" ]
}
}
},
"complete_oauth_server_input_specification" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"client_id" : {
"type" : "string"
},
"client_secret" : {
"type" : "string"
}
}
},
"complete_oauth_server_output_specification" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"client_id" : {
"type" : "string",
"path_in_connector_config" : [ "credentials", "client_id" ]
},
"client_secret" : {
"type" : "string",
"path_in_connector_config" : [ "credentials", "client_secret" ]
}
}
}
}
}
}

View File

@@ -0,0 +1,292 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/hubspot",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "Hub Spot Specification",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"object_storage_config" : {
"oneOf" : [ {
"type" : "object",
"additionalProperties" : true,
"properties" : {
"storage_type" : {
"type" : "string",
"enum" : [ "None" ],
"default" : "None"
}
},
"title" : "None",
"required" : [ "storage_type" ]
}, {
"type" : "object",
"additionalProperties" : true,
"properties" : {
"storage_type" : {
"type" : "string",
"enum" : [ "S3" ],
"default" : "S3"
},
"access_key_id" : {
"type" : "string",
"description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>.",
"title" : "Access Key ID",
"examples" : [ "A012345678910EXAMPLE" ],
"airbyte_secret" : true,
"always_show" : true,
"order" : 1
},
"secret_access_key" : {
"type" : "string",
"description" : "The corresponding secret to the access key ID. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>",
"title" : "Secret Access Key",
"examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ],
"airbyte_secret" : true,
"always_show" : true,
"order" : 2
},
"role_arn" : {
"type" : "string",
"description" : "The ARN of the AWS role to assume. Only usable in Airbyte Cloud.",
"title" : "Role ARN",
"examples" : [ "arn:aws:iam::123456789:role/ExternalIdIsYourWorkspaceId" ],
"order" : 3
},
"s3_bucket_name" : {
"type" : "string",
"description" : "The name of the S3 bucket. Read more <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html\">here</a>.",
"title" : "S3 Bucket Name",
"examples" : [ "airbyte_sync" ],
"order" : 4
},
"s3_bucket_region" : {
"type" : "string",
"enum" : [ "", "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ],
"description" : "The region of the S3 bucket. See <a href=\"https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions\">here</a> for all region codes.",
"title" : "S3 Bucket Region",
"examples" : [ "us-east-1" ],
"order" : 5,
"default" : ""
},
"s3_endpoint" : {
"type" : "string",
"description" : "Your S3 endpoint url. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/s3.html#:~:text=Service%20endpoints-,Amazon%20S3%20endpoints,-When%20you%20use\">here</a>",
"title" : "S3 Endpoint",
"examples" : [ "http://localhost:9000" ],
"order" : 6
},
"bucket_path" : {
"type" : "string",
"description" : "All files in the bucket will be prefixed by this.",
"title" : "Prefix Path in the Bucket",
"examples" : [ "prefix/" ],
"order" : 7
},
"format" : {
"oneOf" : [ {
"title" : "CSV: Comma-Separated Values",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "CSV" ],
"default" : "CSV"
},
"flattening" : {
"type" : "string",
"default" : "No flattening",
"enum" : [ "No flattening", "Root level flattening" ],
"title" : "Flattening"
}
},
"required" : [ "format_type", "flattening" ]
}, {
"title" : "JSON Lines: Newline-delimited JSON",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "JSONL" ],
"default" : "JSONL"
},
"flattening" : {
"type" : "string",
"default" : "No flattening",
"enum" : [ "No flattening", "Root level flattening" ],
"title" : "Flattening"
}
},
"required" : [ "format_type" ]
} ],
"description" : "Format of the data output.",
"title" : "Output Format",
"examples" : [ "CSV", "JSONL" ],
"default" : "CSV",
"airbyte_hidden" : true,
"order" : 8,
"type" : "object"
},
"compression" : {
"oneOf" : [ {
"title" : "No Compression",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"compression_type" : {
"type" : "string",
"enum" : [ "No Compression" ],
"default" : "No Compression"
}
},
"required" : [ "compression_type" ]
}, {
"title" : "GZIP",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"compression_type" : {
"type" : "string",
"enum" : [ "GZIP" ],
"default" : "GZIP"
}
},
"required" : [ "compression_type" ]
} ],
"description" : "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").",
"title" : "Compression",
"airbyte_hidden" : true,
"type" : "object"
},
"path_format" : {
"type" : "string",
"examples" : [ "{namespace}/{stream_name}/{year}_{month}_{day}_{epoch}" ],
"default" : "{sync_id}/{namespace}/{stream_name}/",
"airbyte_hidden" : true,
"order" : 10
},
"file_name_format" : {
"type" : "string",
"examples" : [ "{date}", "{date:yyyy_MM}", "{timestamp}", "{part_number}", "{sync_id}" ],
"default" : "{date}_{timestamp}_{part_number}{format_extension}",
"airbyte_hidden" : true,
"order" : 11
}
},
"title" : "S3",
"required" : [ "storage_type", "s3_bucket_name", "s3_bucket_region", "bucket_path", "format" ]
} ],
"title" : "Object Storage Configuration",
"type" : "object"
},
"credentials" : {
"oneOf" : [ {
"type" : "object",
"additionalProperties" : true,
"properties" : {
"type" : {
"type" : "string",
"enum" : [ "OAuth" ],
"default" : "OAuth"
},
"client_id" : {
"type" : "string",
"description" : "The Client ID of your HubSpot developer application. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this ID.",
"title" : "Client ID",
"order" : 0,
"airbyte_secret" : true
},
"client_secret" : {
"type" : "string",
"description" : "The client secret for your HubSpot developer application. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this secret.",
"title" : "Client Secret",
"order" : 1,
"airbyte_secret" : true
},
"refresh_token" : {
"type" : "string",
"description" : "Refresh token to renew an expired access token. See the <a href=\\\"https://legacydocs.hubspot.com/docs/methods/oauth2/oauth2-quickstart\\\">Hubspot docs</a> if you need help finding this token.",
"title" : "Refresh Token",
"order" : 2,
"airbyte_secret" : true
}
},
"title" : "OAuth",
"required" : [ "type", "client_id", "client_secret", "refresh_token" ]
} ],
"description" : "Choose how to authenticate to HubSpot.",
"title" : "Credentials",
"order" : 0,
"type" : "object"
}
},
"required" : [ "credentials" ]
},
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "append" ],
"advanced_auth" : {
"auth_flow_type" : "oauth2.0",
"predicate_key" : [ "credentials", "type" ],
"predicate_value" : "OAuth",
"oauth_config_specification" : {
"oauth_connector_input_specification" : {
"consent_url" : "https://app.hubspot.com/oauth/authorize?{{ client_id_key }}={{ client_id_value }}&{{ redirect_uri_key }}={{ redirect_uri_value | urlencode }}&{{ scope_key }}={{ scope_value | urlencode }}&optional_scope={{ optional_scope | urlencode }}&{{ state_key }}={{ state_value }}&code_challenge={{ state_value | codechallengeS256 }}",
"scope" : "crm.schemas.contacts.read crm.schemas.companies.read crm.schemas.deals.read crm.schemas.custom.read",
"optional_scope" : "crm.objects.contacts.write crm.objects.companies.write crm.objects.deals.write crm.objects.custom.write",
"access_token_url" : "https://api.hubapi.com/oauth/v1/token",
"extract_output" : [ "access_token", "refresh_token", "expires_in" ],
"access_token_headers" : {
"Content-Type" : "application/x-www-form-urlencoded"
},
"access_token_params" : {
"client_id" : "{{ client_id_value }}",
"client_secret" : "{{ client_secret_value }}",
"code" : "{{ auth_code_value }}",
"grant_type" : "authorization_code",
"redirect_uri" : "{{ redirect_uri_value }}"
}
},
"complete_oauth_output_specification" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"refresh_token" : {
"type" : "string",
"path_in_connector_config" : [ "credentials", "refresh_token" ],
"path_in_oauth_response" : [ "refresh_token" ]
}
}
},
"complete_oauth_server_input_specification" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"client_id" : {
"type" : "string"
},
"client_secret" : {
"type" : "string"
}
}
},
"complete_oauth_server_output_specification" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"client_id" : {
"type" : "string",
"path_in_connector_config" : [ "credentials", "client_id" ]
},
"client_secret" : {
"type" : "string",
"path_in_connector_config" : [ "credentials", "client_secret" ]
}
}
}
}
}
}

View File

@@ -0,0 +1,209 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot.http
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationOperation
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.http.HttpClient
import io.airbyte.cdk.load.http.Request
import io.airbyte.cdk.load.http.RequestMethod
import io.airbyte.cdk.load.http.Response
import io.mockk.every
import io.mockk.mockk
import java.io.InputStream
import kotlin.test.assertEquals
import kotlin.test.assertIs
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
class HubSpotOperationRepositoryTest {
private lateinit var httpClient: HttpClient
private lateinit var repo: HubSpotOperationRepository
@BeforeEach
fun setUp() {
httpClient = mockk()
mockOtherObjectsAsEmpty()
repo = HubSpotOperationRepository(httpClient)
}
private fun mockOtherObjectsAsEmpty() {
// custom objects
every {
httpClient.send(
Request(method = RequestMethod.GET, url = "https://api.hubapi.com/crm/v3/schemas")
)
} answers
{
aResponse(
200,
HubSpotCustomObjectSchemaResponseBuilder()
.withResult(anUnavailableObject())
.build()
)
}
// other standard objects
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/COMPANY"
)
)
} returns (aResponse(200, anUnavailableObject().build()))
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/DEAL"
)
)
} returns (aResponse(200, anUnavailableObject().build()))
}
@Test
internal fun `test when fetch all then return contact dedupe`() {
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/CONTACT"
)
)
} returns aResponse(200, aResponseBodyWithEmailProperty().build())
val operations = repo.fetchAll()
assertEquals(1, operations.size)
val operation = operations[0]
assertEquals(Dedupe(emptyList(), emptyList()), operation.syncMode)
assertEquals(1, operation.schema.asColumns().size)
assertEquals(listOf(listOf("email")), operation.matchingKeys)
assertSchemaWithProperties(operations[0], setOf("email"))
}
@Test
internal fun `test given when fetch all then return available fields as part of schema`() {
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/CONTACT"
)
)
} returns
aResponse(
200,
aResponseBodyWithEmailProperty()
.withProperty(anAvailableProperty("available"))
.build()
)
val operations = repo.fetchAll()
assertEquals(1, operations.size)
assertSchemaWithProperties(operations[0], setOf("email", "available"))
}
@Test
internal fun `test given calculated field when fetch all then field is not part of schema`() {
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/CONTACT"
)
)
} returns
aResponse(
200,
aResponseBodyWithEmailProperty()
.withProperty(aProperty("notAvailable").withCalculated(true))
.build()
)
val operations = repo.fetchAll()
assertEquals(1, operations.size)
assertSchemaWithProperties(operations[0], setOf("email"))
}
@Test
internal fun `test given read only field when fetch all then field is not part of schema`() {
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/CONTACT"
)
)
} returns
aResponse(
200,
aResponseBodyWithEmailProperty()
.withProperty(aProperty("notAvailable").withReadOnlyValue(true))
.build()
)
val operations = repo.fetchAll()
assertEquals(1, operations.size)
assertSchemaWithProperties(operations[0], setOf("email"))
}
@Test
internal fun `test given hubspot internal type when fetch all then field is not part of schema`() {
every {
httpClient.send(
Request(
method = RequestMethod.GET,
url = "https://api.hubapi.com/crm/v3/schemas/CONTACT"
)
)
} returns
aResponse(
200,
aResponseBodyWithEmailProperty()
.withProperty(aProperty("notAvailable").withType("object_coordinates"))
.build()
)
val operations = repo.fetchAll()
assertEquals(1, operations.size)
assertSchemaWithProperties(operations[0], setOf("email"))
}
private fun anAvailableProperty(name: String): HubSpotPropertySchemaBuilder =
aProperty(name).withType("string").withCalculated(false).withReadOnlyValue(false)
private fun assertSchemaWithProperties(operation: DestinationOperation, fields: Set<String>) {
assertIs<ObjectType>(operation.schema)
val schema = operation.schema as ObjectType
assertEquals(fields, schema.properties.keys)
}
private fun aResponseBodyWithEmailProperty(): HubSpotSchemaResponseBuilder =
HubSpotSchemaResponseBuilder()
.withName("CONTACT")
.withProperty(anAvailableProperty("email"))
private fun aProperty(name: String): HubSpotPropertySchemaBuilder =
HubSpotPropertySchemaBuilder().withName(name)
fun aResponse(statusCode: Int, body: InputStream): Response {
val response = mockk<Response>()
every { response.statusCode } returns statusCode
every { response.body } returns body
every { response.close() } returns Unit
return response
}
private fun anUnavailableObject(): HubSpotSchemaResponseBuilder =
HubSpotSchemaResponseBuilder().withName("unavailable")
}

View File

@@ -0,0 +1,83 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.hubspot.http
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.util.Jsons
import java.io.InputStream
class HubSpotCustomObjectSchemaResponseBuilder {
private val response = Jsons.objectNode()
private val results = response.putArray("results")
fun withResult(
builder: HubSpotSchemaResponseBuilder
): HubSpotCustomObjectSchemaResponseBuilder {
results.add(builder.schema)
return this
}
fun build(): InputStream {
return ObjectMapper().writeValueAsString(response).byteInputStream()
}
}
class HubSpotSchemaResponseBuilder {
internal val schema = Jsons.objectNode()
private val properties = schema.putArray("properties")
init {
this.withName("any_object_name")
}
fun withName(name: String): HubSpotSchemaResponseBuilder {
schema.put("name", name)
return this
}
fun withProperty(
property: HubSpotPropertySchemaBuilder,
): HubSpotSchemaResponseBuilder {
properties.add(property.build())
return this
}
fun build(): InputStream {
return ObjectMapper().writeValueAsString(schema).byteInputStream()
}
}
class HubSpotPropertySchemaBuilder {
private val node = Jsons.objectNode().apply { this.putObject("modificationMetadata") }
init {
this.withName("any_name").withType("string").withCalculated(false).withReadOnlyValue(false)
}
fun withCalculated(calculated: Boolean): HubSpotPropertySchemaBuilder {
node.put("calculated", calculated)
return this
}
fun withName(name: String): HubSpotPropertySchemaBuilder {
node.put("name", name)
return this
}
fun withType(type: String): HubSpotPropertySchemaBuilder {
node.put("type", type)
return this
}
fun withReadOnlyValue(readOnlyValue: Boolean): HubSpotPropertySchemaBuilder {
(node.get("modificationMetadata") as ObjectNode).put("readOnlyValue", readOnlyValue)
return this
}
fun build(): ObjectNode {
return node
}
}

View File

@@ -61,3 +61,17 @@ During app the app installation, you might see scopes related to objects we don'
### 403 Forbidden Error
Hubspot has **scopes** for each API call. Each stream is tied to a scope and will need access to that scope to sync data. Review the Hubspot OAuth scope documentation [here](https://developers.hubspot.com/docs/api/working-with-oauth#scopes).
## Changelog
<details>
<summary>Expand to review</summary>
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------|:---------------------------|
| 0.0.4 | 2025-08-01 | [64144](https://github.com/airbytehq/airbyte/pull/64144) | OSS release |
| 0.0.3 | 2025-07-18 | [205](https://github.com/airbytehq/airbyte-enterprise/pull/205) | Forcing new release |
| 0.0.2 | 2025-07-18 | [204](https://github.com/airbytehq/airbyte-enterprise/pull/204) | Fixing auth |
| 0.0.1 | 2025-07-18 | [201](https://github.com/airbytehq/airbyte-enterprise/pull/201) | First iteration internally |
</details>