1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-hubspot/unit_tests/utils.py
Brian Lai 4965fca525 fix(source-hubspot): bump to latest version of CDK to get the bug fix for missing custom properties during incremental syncs (#68159)
## What

We had a bug where during incremental syncs Hubspot CRM Search streams
were not including the custom properties in the json body of the POST
request so they were not getting received and emitted with records.

## How

The bug was in the CDK and it was fixed in version `7.3.7` in this
https://github.com/airbytehq/airbyte-python-cdk/pull/797

We need to bump the version of SDM to get the fix, but in addition, we
need to upgrade the unit_test `pyproject.toml` which is still on v6.
I've also added a new test that validates that properties are indeed
populated in the outbound request. And with the bump from v6 to v7 I
fixed the tests which have now changed.

**Note**: It does feel like we have something of a gap where our unit
tests don't properly test CDK changes since the two are independently
versioned... This is something we may want to investigate and solve so
these types of things don't happen again

## Can this PR be safely reverted and rolled back?

- [ ] YES 💚
- [ ] NO 

Kind of... If we do this wrong then we have to reset customers back to
their previous state, but this is no different than the state we were
previously in
2025-10-20 11:30:35 -07:00

20 lines
705 B
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
def run_read(stream_instance: DefaultStream):
res = []
schema = stream_instance.get_json_schema()
partitions = stream_instance._stream_partition_generator.generate()
for partition in partitions:
records = partition.read()
for record in records:
stream_instance._stream_partition_generator._partition_factory._retriever.record_selector._transform(record, schema)
res.append(record)
stream_instance.cursor.observe(record)
stream_instance.cursor.close_partition(partition)
return res