fix: do not return operation if matching key predicate but none found (#63699)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Jimmy Ma <gosusnp@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
2615b28913
commit
d2da528a31
@@ -7,7 +7,6 @@ package io.airbyte.cdk.load.discoverer.operation
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode
|
||||
import io.airbyte.cdk.load.command.DestinationOperation
|
||||
import io.airbyte.cdk.load.command.ImportType
|
||||
import io.airbyte.cdk.load.data.AirbyteType
|
||||
import io.airbyte.cdk.load.data.ObjectType
|
||||
import io.airbyte.cdk.load.discoverer.destinationobject.DestinationObject
|
||||
@@ -27,7 +26,7 @@ private val logger = KotlinLogging.logger {}
|
||||
*/
|
||||
class DestinationOperationAssembler(
|
||||
private val propertiesPath: List<String>,
|
||||
private val propertyFactoriesByImportType: Map<ImportType, DiscoveredPropertyFactory>,
|
||||
private val insertionMethods: List<InsertionMethod>,
|
||||
private val schemaRequester: HttpRequester?,
|
||||
) {
|
||||
// FIXME once we figure out the decoder interface, we should have this configurable and/or move
|
||||
@@ -55,8 +54,8 @@ class DestinationOperationAssembler(
|
||||
throw IllegalStateException("The schema returned by the API does not have properties")
|
||||
}
|
||||
|
||||
return propertyFactoriesByImportType.mapNotNull { (importType, propertyFactory) ->
|
||||
createOperation(destinationObject.name, apiSchema, importType, propertyFactory)
|
||||
return insertionMethods.mapNotNull {
|
||||
createOperation(destinationObject.name, apiSchema, it)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,11 +72,10 @@ class DestinationOperationAssembler(
|
||||
private fun createOperation(
|
||||
objectName: String,
|
||||
schemaFromApi: JsonNode,
|
||||
importType: ImportType,
|
||||
propertyFactory: DiscoveredPropertyFactory
|
||||
insertionMethod: InsertionMethod
|
||||
): DestinationOperation? {
|
||||
val properties =
|
||||
schemaFromApi.extractArray(propertiesPath).map { propertyFactory.create(it) }
|
||||
schemaFromApi.extractArray(propertiesPath).map { insertionMethod.createProperty(it) }
|
||||
val matchingKeys =
|
||||
properties.filter {
|
||||
it.isMatchingKey()
|
||||
@@ -86,13 +84,18 @@ class DestinationOperationAssembler(
|
||||
|
||||
if (propertiesForSyncMode.isEmpty()) {
|
||||
logger.warn {
|
||||
"Object $objectName with operation $importType has no properties and therefore will not be added to the catalog"
|
||||
"Object $objectName with operation ${insertionMethod.getImportType()} has no properties and therefore will not be added to the catalog"
|
||||
}
|
||||
return null
|
||||
} else if (insertionMethod.requiresMatchingKey() && matchingKeys.isEmpty()) {
|
||||
logger.warn {
|
||||
"Object $objectName with operation ${insertionMethod.getImportType()} requires at least one matching key but none was found"
|
||||
}
|
||||
return null
|
||||
}
|
||||
return DestinationOperation(
|
||||
objectName,
|
||||
importType,
|
||||
insertionMethod.getImportType(),
|
||||
getSchema(propertiesForSyncMode),
|
||||
matchingKeys.map { listOf(it.getName()) },
|
||||
)
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.discoverer.operation
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import io.airbyte.cdk.load.data.AirbyteType
|
||||
import java.util.function.Predicate
|
||||
|
||||
class DiscoveredPropertyFactory(
|
||||
private val namePath: List<String>,
|
||||
private val typePath: List<String>,
|
||||
private val matchingKeyPredicate: Predicate<JsonNode>,
|
||||
private val availabilityPredicate: Predicate<JsonNode>,
|
||||
private val requiredPredicate: Predicate<JsonNode>,
|
||||
private val typeMapper: Map<String, AirbyteType>,
|
||||
) {
|
||||
fun create(apiRepresentation: JsonNode): DiscoveredProperty {
|
||||
return DiscoveredProperty(
|
||||
apiRepresentation,
|
||||
namePath,
|
||||
typePath,
|
||||
matchingKeyPredicate,
|
||||
availabilityPredicate,
|
||||
requiredPredicate,
|
||||
typeMapper,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.discoverer.operation
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import io.airbyte.cdk.load.command.ImportType
|
||||
import io.airbyte.cdk.load.data.AirbyteType
|
||||
import java.util.function.Predicate
|
||||
|
||||
/**
|
||||
* Describes the information related to how data is inserted in an object.
|
||||
*
|
||||
* Note that if matchingKeyPredicate is provided, it needs to return `true` to at least one of the
|
||||
* properties. If this is not the case, the insertion method for this object will not be returned as
|
||||
* part of the discover command. If not provided, it means that it is expected not to have matching
|
||||
* keys.
|
||||
*/
|
||||
class InsertionMethod(
|
||||
private val importType: ImportType,
|
||||
private val namePath: List<String>,
|
||||
private val typePath: List<String>,
|
||||
private val matchingKeyPredicate: Predicate<JsonNode>?,
|
||||
private val availabilityPredicate: Predicate<JsonNode>,
|
||||
private val requiredPredicate: Predicate<JsonNode>,
|
||||
private val typeMapper: Map<String, AirbyteType>,
|
||||
) {
|
||||
fun getImportType(): ImportType = importType
|
||||
|
||||
fun createProperty(apiRepresentation: JsonNode): DiscoveredProperty {
|
||||
return DiscoveredProperty(
|
||||
apiRepresentation,
|
||||
namePath,
|
||||
typePath,
|
||||
matchingKeyPredicate ?: Predicate { _ -> false },
|
||||
availabilityPredicate,
|
||||
requiredPredicate,
|
||||
typeMapper,
|
||||
)
|
||||
}
|
||||
|
||||
fun requiresMatchingKey(): Boolean {
|
||||
return matchingKeyPredicate != null
|
||||
}
|
||||
}
|
||||
@@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test
|
||||
class DestinationOperationAssemblerTest {
|
||||
|
||||
lateinit var schemaRequester: HttpRequester
|
||||
lateinit var factory: DiscoveredPropertyFactory
|
||||
lateinit var insertionMethod: InsertionMethod
|
||||
lateinit var property: DiscoveredProperty
|
||||
|
||||
companion object {
|
||||
@@ -34,8 +34,10 @@ class DestinationOperationAssemblerTest {
|
||||
fun setUp() {
|
||||
schemaRequester = mockk()
|
||||
property = mockk(relaxed = true)
|
||||
factory = mockk()
|
||||
every { factory.create(any()) } returns property
|
||||
insertionMethod = mockk()
|
||||
every { insertionMethod.getImportType() } returns SoftDelete
|
||||
every { insertionMethod.requiresMatchingKey() } returns false
|
||||
every { insertionMethod.createProperty(any()) } returns property
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -44,7 +46,7 @@ class DestinationOperationAssemblerTest {
|
||||
val assembler =
|
||||
DestinationOperationAssembler(
|
||||
listOf(PROPERTY_PATH),
|
||||
mapOf(SoftDelete to factory),
|
||||
listOf(insertionMethod),
|
||||
NO_SCHEMA_REQUESTER,
|
||||
)
|
||||
|
||||
@@ -60,7 +62,7 @@ class DestinationOperationAssemblerTest {
|
||||
val assembler =
|
||||
DestinationOperationAssembler(
|
||||
listOf(PROPERTY_PATH),
|
||||
mapOf(SoftDelete to factory),
|
||||
listOf(insertionMethod),
|
||||
NO_SCHEMA_REQUESTER,
|
||||
)
|
||||
|
||||
@@ -75,7 +77,7 @@ class DestinationOperationAssemblerTest {
|
||||
val assembler =
|
||||
DestinationOperationAssembler(
|
||||
listOf(PROPERTY_PATH),
|
||||
mapOf(SoftDelete to factory),
|
||||
listOf(insertionMethod),
|
||||
NO_SCHEMA_REQUESTER,
|
||||
)
|
||||
|
||||
@@ -94,7 +96,7 @@ class DestinationOperationAssemblerTest {
|
||||
val assembler =
|
||||
DestinationOperationAssembler(
|
||||
listOf(PROPERTY_PATH),
|
||||
mapOf(SoftDelete to factory),
|
||||
listOf(insertionMethod),
|
||||
schemaRequester,
|
||||
)
|
||||
|
||||
@@ -103,6 +105,42 @@ class DestinationOperationAssemblerTest {
|
||||
assertEquals(1, operations.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given matching keys expected but none found when assemble then do not return operation`() {
|
||||
every { insertionMethod.requiresMatchingKey() } returns true
|
||||
every { property.isMatchingKey() } returns false
|
||||
every { property.isAvailable() } returns true
|
||||
val assembler =
|
||||
DestinationOperationAssembler(
|
||||
listOf(PROPERTY_PATH),
|
||||
listOf(insertionMethod),
|
||||
NO_SCHEMA_REQUESTER,
|
||||
)
|
||||
|
||||
val operations =
|
||||
assembler.assemble(DestinationObject(OBJECT_NAME, apiRepresentationWithOneProperty()))
|
||||
|
||||
assertEquals(0, operations.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given matching keys expected and found when assemble then return operation`() {
|
||||
every { insertionMethod.requiresMatchingKey() } returns true
|
||||
every { property.isMatchingKey() } returns true
|
||||
every { property.isAvailable() } returns true
|
||||
val assembler =
|
||||
DestinationOperationAssembler(
|
||||
listOf(PROPERTY_PATH),
|
||||
listOf(insertionMethod),
|
||||
NO_SCHEMA_REQUESTER,
|
||||
)
|
||||
|
||||
val operations =
|
||||
assembler.assemble(DestinationObject(OBJECT_NAME, apiRepresentationWithOneProperty()))
|
||||
|
||||
assertEquals(1, operations.size)
|
||||
}
|
||||
|
||||
private fun apiRepresentationWithOneProperty(): ObjectNode =
|
||||
Jsons.objectNode().apply { this.putArray(PROPERTY_PATH).add(Jsons.objectNode()) }
|
||||
}
|
||||
|
||||
@@ -38,6 +38,14 @@ The HubSpot source connector supports the following streams:
|
||||
|
||||
## Limitations & Troubleshooting
|
||||
|
||||
### Destination Object Not Showing Up
|
||||
|
||||
Except from the CONTACT object, the upsert method for this connector requires a unique value field to be present on the destination object. In order to create a unique value property, go in HubSpot and do the following:
|
||||
* In the CRM menu in the left-hand side, select the object you want to sync
|
||||
* Under `Actions`, select `Edit Properties`
|
||||
* Click on `Create property`
|
||||
* When entering the rules, check `Require unique values for this property`
|
||||
|
||||
### Rate limiting
|
||||
|
||||
The connector is restricted by normal HubSpot [rate limitations](https://developers.hubspot.com/docs/guides/apps/api-usage/usage-details#public-apps).
|
||||
|
||||
Reference in New Issue
Block a user