🐛 Source Github: return AirbyteMessage if max retry exeeded for 202 status code (#32679)
Co-authored-by: darynaishchenko <darynaishchenko@users.noreply.github.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -10,7 +10,7 @@ data:
|
||||
connectorSubtype: api
|
||||
connectorType: source
|
||||
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
|
||||
dockerImageTag: 1.5.3
|
||||
dockerImageTag: 1.5.4
|
||||
dockerRepository: airbyte/source-github
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/github
|
||||
githubIssueLabel: source-github
|
||||
|
||||
@@ -9,7 +9,8 @@ from urllib import parse
|
||||
|
||||
import pendulum
|
||||
import requests
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
|
||||
from airbyte_cdk.models import Type as MessageType
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.http import HttpStream
|
||||
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
|
||||
@@ -1606,8 +1607,13 @@ class ContributorActivity(GithubStream):
|
||||
yield from super().read_records(stream_slice=stream_slice, **kwargs)
|
||||
except HTTPError as e:
|
||||
if e.response.status_code == requests.codes.ACCEPTED:
|
||||
self.logger.info(f"Syncing `{self.__class__.__name__}` stream isn't available for repository `{repository}`.")
|
||||
yield
|
||||
yield AirbyteMessage(
|
||||
type=MessageType.LOG,
|
||||
log=AirbyteLogMessage(
|
||||
level=Level.INFO,
|
||||
message=f"Syncing `{self.__class__.__name__}` " f"stream isn't available for repository `{repository}`.",
|
||||
),
|
||||
)
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
||||
@@ -10,11 +10,11 @@ from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
import requests
|
||||
import responses
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
|
||||
from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException, UserDefinedBackoffException
|
||||
from requests import HTTPError
|
||||
from responses import matchers
|
||||
from source_github import constants
|
||||
from source_github import SourceGithub, constants
|
||||
from source_github.streams import (
|
||||
Branches,
|
||||
Collaborators,
|
||||
@@ -1369,21 +1369,50 @@ def test_stream_contributor_activity_parse_empty_response(caplog):
|
||||
|
||||
@responses.activate
|
||||
def test_stream_contributor_activity_accepted_response(caplog):
|
||||
repository_args = {
|
||||
"page_size_for_large_streams": 20,
|
||||
"repositories": ["airbytehq/airbyte"],
|
||||
}
|
||||
stream = ContributorActivity(**repository_args)
|
||||
responses.add(
|
||||
responses.GET,
|
||||
"https://api.github.com/repos/airbytehq/test_airbyte?per_page=100",
|
||||
json={"full_name": "airbytehq/test_airbyte"},
|
||||
status=200,
|
||||
)
|
||||
responses.add(
|
||||
responses.GET,
|
||||
"https://api.github.com/repos/airbytehq/test_airbyte?per_page=100",
|
||||
json={"full_name": "airbytehq/test_airbyte", "default_branch": "default_branch"},
|
||||
status=200,
|
||||
)
|
||||
responses.add(
|
||||
responses.GET,
|
||||
"https://api.github.com/repos/airbytehq/test_airbyte/branches?per_page=100",
|
||||
json={},
|
||||
status=200,
|
||||
)
|
||||
resp = responses.add(
|
||||
responses.GET,
|
||||
"https://api.github.com/repos/airbytehq/airbyte/stats/contributors",
|
||||
"https://api.github.com/repos/airbytehq/test_airbyte/stats/contributors?per_page=100",
|
||||
body="",
|
||||
status=202,
|
||||
)
|
||||
|
||||
source = SourceGithub()
|
||||
configured_catalog = {
|
||||
"streams": [
|
||||
{
|
||||
"stream": {"name": "contributor_activity", "json_schema": {}, "supported_sync_modes": ["full_refresh"],"source_defined_primary_key": [["id"]]},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
}
|
||||
]
|
||||
}
|
||||
catalog = ConfiguredAirbyteCatalog.parse_obj(configured_catalog)
|
||||
config = {"access_token": "test_token", "repository": "airbytehq/test_airbyte"}
|
||||
logger_mock = MagicMock()
|
||||
|
||||
with patch("time.sleep", return_value=0):
|
||||
list(read_full_refresh(stream))
|
||||
records = list(source.read(config=config, logger=logger_mock, catalog=catalog, state={}))
|
||||
|
||||
assert records[2].log.message == "Syncing `ContributorActivity` stream isn't available for repository `airbytehq/test_airbyte`."
|
||||
assert resp.call_count == 6
|
||||
assert "Syncing `ContributorActivity` stream isn't available for repository `airbytehq/airbyte`." in caplog.messages
|
||||
|
||||
|
||||
@responses.activate
|
||||
|
||||
Reference in New Issue
Block a user