Source Hubspot: turn on default HttpAvailabilityStrategy (#22479)
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
This commit is contained in:
@@ -799,7 +799,7 @@
|
||||
- name: HubSpot
|
||||
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
|
||||
dockerRepository: airbyte/source-hubspot
|
||||
dockerImageTag: 0.3.1
|
||||
dockerImageTag: 0.3.2
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
|
||||
icon: hubspot.svg
|
||||
sourceType: api
|
||||
|
||||
@@ -6521,7 +6521,7 @@
|
||||
supportsNormalization: false
|
||||
supportsDBT: false
|
||||
supported_destination_sync_modes: []
|
||||
- dockerImage: "airbyte/source-hubspot:0.3.1"
|
||||
- dockerImage: "airbyte/source-hubspot:0.3.2"
|
||||
spec:
|
||||
documentationUrl: "https://docs.airbyte.com/integrations/sources/hubspot"
|
||||
connectionSpecification:
|
||||
|
||||
@@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
|
||||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.3.1
|
||||
LABEL io.airbyte.version=0.3.2
|
||||
LABEL io.airbyte.name=airbyte/source-hubspot
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -4,17 +4,12 @@
|
||||
|
||||
import logging
|
||||
from itertools import chain
|
||||
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
|
||||
from typing import Any, List, Mapping, Optional, Tuple
|
||||
|
||||
import requests
|
||||
from airbyte_cdk.logger import AirbyteLogger
|
||||
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
|
||||
from airbyte_cdk.sources import AbstractSource
|
||||
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.utils.schema_helpers import split_config
|
||||
from airbyte_cdk.utils.event_timing import create_timer
|
||||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
|
||||
from requests import HTTPError
|
||||
from source_hubspot.streams import (
|
||||
API,
|
||||
@@ -136,51 +131,3 @@ class SourceHubspot(AbstractSource):
|
||||
available_streams = streams
|
||||
|
||||
return available_streams
|
||||
|
||||
def read(
|
||||
self,
|
||||
logger: logging.Logger,
|
||||
config: Mapping[str, Any],
|
||||
catalog: ConfiguredAirbyteCatalog,
|
||||
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
|
||||
) -> Iterator[AirbyteMessage]:
|
||||
"""
|
||||
This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream.
|
||||
"""
|
||||
logger.info(f"Starting syncing {self.name}")
|
||||
config, internal_config = split_config(config)
|
||||
# TODO assert all streams exist in the connector
|
||||
# get the streams once in case the connector needs to make any queries to generate them
|
||||
stream_instances = {s.name: s for s in self.streams(config)}
|
||||
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
|
||||
self._stream_to_instance_map = stream_instances
|
||||
with create_timer(self.name) as timer:
|
||||
for configured_stream in catalog.streams:
|
||||
stream_instance = stream_instances.get(configured_stream.stream.name)
|
||||
if not stream_instance and configured_stream.stream.name == "quotes":
|
||||
logger.warning("Stream `quotes` does not exist in the source. Skip reading `quotes` stream.")
|
||||
continue
|
||||
if not stream_instance:
|
||||
raise KeyError(
|
||||
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
|
||||
)
|
||||
|
||||
try:
|
||||
yield from self._read_stream(
|
||||
logger=logger,
|
||||
stream_instance=stream_instance,
|
||||
configured_stream=configured_stream,
|
||||
state_manager=state_manager,
|
||||
internal_config=internal_config,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
|
||||
display_message = stream_instance.get_error_display_message(e)
|
||||
if display_message:
|
||||
raise AirbyteTracedException.from_exception(e, message=display_message) from e
|
||||
raise e
|
||||
finally:
|
||||
logger.info(f"Finished syncing {self.name}")
|
||||
logger.info(timer.report())
|
||||
|
||||
logger.info(f"Finished syncing {self.name}")
|
||||
|
||||
@@ -15,7 +15,6 @@ import pendulum as pendulum
|
||||
import requests
|
||||
from airbyte_cdk.entrypoint import logger
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.http import HttpStream
|
||||
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
|
||||
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
||||
@@ -206,14 +205,9 @@ class Stream(HttpStream, ABC):
|
||||
primary_key = None
|
||||
filter_old_records: bool = True
|
||||
denormalize_records: bool = False # one record from API response can result in multiple records emitted
|
||||
raise_on_http_errors: bool = True
|
||||
granted_scopes: Set = None
|
||||
properties_scopes: Set = None
|
||||
|
||||
@property
|
||||
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
|
||||
return None
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def scopes(self) -> Set[str]:
|
||||
@@ -263,12 +257,6 @@ class Stream(HttpStream, ABC):
|
||||
if creds_title in (OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS):
|
||||
self._authenticator = api.get_authenticator()
|
||||
|
||||
def should_retry(self, response: requests.Response) -> bool:
|
||||
if response.status_code == HTTPStatus.FORBIDDEN:
|
||||
setattr(self, "raise_on_http_errors", False)
|
||||
logger.warning("You have not permission to API for this stream. " "Please check your scopes for Hubspot account.")
|
||||
return super().should_retry(response)
|
||||
|
||||
def backoff_time(self, response: requests.Response) -> Optional[float]:
|
||||
if response.status_code == codes.too_many_requests:
|
||||
return float(response.headers.get("Retry-After", 3))
|
||||
|
||||
@@ -42,11 +42,6 @@ def some_credentials_fixture():
|
||||
return {"credentials_title": "Private App Credentials", "access_token": "wrong token"}
|
||||
|
||||
|
||||
@pytest.fixture(name="creds_with_wrong_permissions")
|
||||
def creds_with_wrong_permissions():
|
||||
return {"credentials_title": "Private App Credentials", "access_token": "THIS-IS-THE-ACCESS_TOKEN"}
|
||||
|
||||
|
||||
@pytest.fixture(name="fake_properties_list")
|
||||
def fake_properties_list():
|
||||
return [f"property_number_{i}" for i in range(NUMBER_OF_PROPERTIES)]
|
||||
|
||||
@@ -13,7 +13,7 @@ from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
|
||||
from source_hubspot.errors import HubspotRateLimited
|
||||
from source_hubspot.helpers import APIv3Property
|
||||
from source_hubspot.source import SourceHubspot
|
||||
from source_hubspot.streams import API, Companies, Deals, Engagements, Products, Stream, Workflows
|
||||
from source_hubspot.streams import API, Companies, Deals, Engagements, Products, Stream
|
||||
|
||||
from .utils import read_full_refresh, read_incremental
|
||||
|
||||
@@ -134,38 +134,30 @@ def test_check_connection_backoff_on_server_error(requests_mock, config):
|
||||
assert not error
|
||||
|
||||
|
||||
def test_wrong_permissions_api_key(requests_mock, creds_with_wrong_permissions, common_params, caplog):
|
||||
"""
|
||||
Error with API Key Permissions to particular stream,
|
||||
typically this issue raises along with calling `workflows` stream with API Key
|
||||
that doesn't have required permissions to read the stream.
|
||||
"""
|
||||
|
||||
# Mapping tipical response for mocker
|
||||
def test_stream_forbidden(requests_mock, config, caplog):
|
||||
json = {
|
||||
"status": "error",
|
||||
"message": f'This hapikey ({creds_with_wrong_permissions.get("api_key")}) does not have proper permissions! (requires any of [automation-access])',
|
||||
"correlationId": "2fe0a9af-3609-45c9-a4d7-83a1774121aa",
|
||||
"message": "This access_token does not have proper permissions!",
|
||||
}
|
||||
requests_mock.get("https://api.hubapi.com/automation/v3/workflows", json=json, status_code=403)
|
||||
|
||||
# We expect something like this
|
||||
expected_warining_message = {
|
||||
"type": "LOG",
|
||||
"log": {
|
||||
"level": "WARN",
|
||||
"message": f'Stream `workflows` cannot be procced. This hapikey ({creds_with_wrong_permissions.get("api_key")}) does not have proper permissions! (requires any of [automation-access])',
|
||||
},
|
||||
}
|
||||
catalog = ConfiguredAirbyteCatalog.parse_obj({
|
||||
"streams": [
|
||||
{
|
||||
"stream": {
|
||||
"name": "workflows",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"],
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
# Create test_stream instance
|
||||
test_stream = Workflows(**common_params)
|
||||
|
||||
# Mocking Request
|
||||
requests_mock.register_uri("GET", test_stream.url, json=json, status_code=403)
|
||||
records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh))
|
||||
|
||||
# match logged expected logged warning message with output given from preudo-output
|
||||
assert expected_warining_message["log"]["message"] in caplog.text
|
||||
records = list(SourceHubspot().read(logger, config, catalog, {}))
|
||||
assert json["message"] in caplog.text
|
||||
records = [r for r in records if r.type == Type.RECORD]
|
||||
assert not records
|
||||
|
||||
|
||||
@@ -328,17 +320,6 @@ def configured_catalog_fixture():
|
||||
return ConfiguredAirbyteCatalog.parse_obj(configured_catalog)
|
||||
|
||||
|
||||
def test_it_should_not_read_quotes_stream_if_it_does_not_exist_in_client(oauth_config, configured_catalog):
|
||||
"""
|
||||
If 'quotes' stream is not in the client, it should skip it.
|
||||
"""
|
||||
source = SourceHubspot()
|
||||
|
||||
all_records = list(source.read(logger, config=oauth_config, catalog=configured_catalog, state=None))
|
||||
records = [record for record in all_records if record.type == Type.RECORD]
|
||||
assert not records
|
||||
|
||||
|
||||
def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(requests_mock, common_params, fake_properties_list):
|
||||
"""
|
||||
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
|
||||
|
||||
@@ -126,8 +126,9 @@ Now that you have set up the Hubspot source connector, check out the following H
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 0.3.1 | 2023-01-27 | [22009](https://github.com/airbytehq/airbyte/pull/22009) | Set `AvailabilityStrategy` for streams explicitly to `None` |
|
||||
| 0.3.0 | 2022-10-27 | [18546](https://github.com/airbytehq/airbyte/pull/18546) | Sunsetting API Key authentication. `Quotes` stream is no longer available |
|
||||
| 0.3.2 | 2023-02-07 | [22479](https://github.com/airbytehq/airbyte/pull/22479) | Turn on default HttpAvailabilityStrategy |
|
||||
| 0.3.1 | 2023-01-27 | [22009](https://github.com/airbytehq/airbyte/pull/22009) | Set `AvailabilityStrategy` for streams explicitly to `None` |
|
||||
| 0.3.0 | 2022-10-27 | [18546](https://github.com/airbytehq/airbyte/pull/18546) | Sunsetting API Key authentication. `Quotes` stream is no longer available |
|
||||
| 0.2.2 | 2022-10-03 | [16914](https://github.com/airbytehq/airbyte/pull/16914) | Fix 403 forbidden error validation |
|
||||
| 0.2.1 | 2022-09-26 | [17120](https://github.com/airbytehq/airbyte/pull/17120) | Migrate to per-stream state. |
|
||||
| 0.2.0 | 2022-09-13 | [16632](https://github.com/airbytehq/airbyte/pull/16632) | Remove Feedback Submissions stream as the one using unstable (beta) API. |
|
||||
|
||||
Reference in New Issue
Block a user