* Issue #1353: implement backoff for integration tests * update code for backoff HTTP error while read Google Sheet * create Client class for Google Sheets with backoff all methods * update Google Sheets Source after review #2 * update docker version for google_sheets_source
This commit is contained in:
@@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH
|
||||
COPY setup.py ./
|
||||
RUN pip install .
|
||||
|
||||
LABEL io.airbyte.version=0.1.4
|
||||
LABEL io.airbyte.version=0.1.5
|
||||
LABEL io.airbyte.name=airbyte/source-google-sheets
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2020 Airbyte
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
"""
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
import backoff
|
||||
from googleapiclient import errors
|
||||
from requests import codes as status_codes
|
||||
|
||||
from .helpers import SCOPES, Helpers
|
||||
|
||||
|
||||
def error_handler(error):
|
||||
return error.resp.status != status_codes.TOO_MANY_REQUESTS
|
||||
|
||||
|
||||
class GoogleSheetsClient:
|
||||
def __init__(self, credentials: Dict[str, str], scopes: List[str] = SCOPES):
|
||||
self.client = Helpers.get_authenticated_sheets_client(credentials, scopes)
|
||||
|
||||
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
|
||||
def get(self, **kwargs):
|
||||
return self.client.get(**kwargs).execute()
|
||||
|
||||
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
|
||||
def create(self, **kwargs):
|
||||
return self.client.create(**kwargs).execute()
|
||||
|
||||
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
|
||||
def get_values(self, **kwargs):
|
||||
return self.client.values().batchGet(**kwargs).execute()
|
||||
|
||||
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
|
||||
def update_values(self, **kwargs):
|
||||
return self.client.values().batchUpdate(**kwargs).execute()
|
||||
@@ -28,7 +28,9 @@ from typing import Dict, Generator
|
||||
from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
|
||||
from apiclient import errors
|
||||
from base_python import AirbyteLogger, Source
|
||||
from requests.status_codes import codes as status_codes
|
||||
|
||||
from .client import GoogleSheetsClient
|
||||
from .helpers import Helpers
|
||||
from .models.spreadsheet import Spreadsheet
|
||||
from .models.spreadsheet_values import SpreadsheetValues
|
||||
@@ -46,15 +48,15 @@ class GoogleSheetsSource(Source):
|
||||
|
||||
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
|
||||
# Check involves verifying that the specified spreadsheet is reachable with our credentials.
|
||||
client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"]))
|
||||
client = GoogleSheetsClient(json.loads(config["credentials_json"]))
|
||||
spreadsheet_id = config["spreadsheet_id"]
|
||||
try:
|
||||
# Attempt to get first row of sheet
|
||||
client.get(spreadsheetId=spreadsheet_id, includeGridData=False, ranges="1:1").execute()
|
||||
client.get(spreadsheetId=spreadsheet_id, includeGridData=False, ranges="1:1")
|
||||
except errors.HttpError as err:
|
||||
reason = str(err)
|
||||
# Give a clearer message if it's a common error like 404.
|
||||
if err.resp.status == 404:
|
||||
if err.resp.status == status_codes.NOT_FOUND:
|
||||
reason = "Requested spreadsheet was not found."
|
||||
logger.error(f"Formatted error: {reason}")
|
||||
return AirbyteConnectionStatus(status=Status.FAILED, message=str(reason))
|
||||
@@ -62,11 +64,11 @@ class GoogleSheetsSource(Source):
|
||||
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
|
||||
|
||||
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
|
||||
client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"]))
|
||||
client = GoogleSheetsClient(json.loads(config["credentials_json"]))
|
||||
spreadsheet_id = config["spreadsheet_id"]
|
||||
try:
|
||||
logger.info(f"Running discovery on sheet {spreadsheet_id}")
|
||||
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False).execute())
|
||||
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False))
|
||||
sheet_names = [sheet.properties.title for sheet in spreadsheet_metadata.sheets]
|
||||
streams = []
|
||||
for sheet_name in sheet_names:
|
||||
@@ -77,14 +79,14 @@ class GoogleSheetsSource(Source):
|
||||
|
||||
except errors.HttpError as err:
|
||||
reason = str(err)
|
||||
if err.resp.status == 404:
|
||||
if err.resp.status == status_codes.NOT_FOUND:
|
||||
reason = "Requested spreadsheet was not found."
|
||||
raise Exception(f"Could not run discovery: {reason}")
|
||||
|
||||
def read(
|
||||
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
|
||||
) -> Generator[AirbyteMessage, None, None]:
|
||||
client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"]))
|
||||
client = GoogleSheetsClient(json.loads(config["credentials_json"]))
|
||||
|
||||
sheet_to_column_name = Helpers.parse_sheet_and_column_names_from_catalog(catalog)
|
||||
spreadsheet_id = config["spreadsheet_id"]
|
||||
@@ -102,7 +104,7 @@ class GoogleSheetsSource(Source):
|
||||
range = f"{sheet}!{row_cursor}:{row_cursor + ROW_BATCH_SIZE}"
|
||||
logger.info(f"Fetching range {range}")
|
||||
row_batch = SpreadsheetValues.parse_obj(
|
||||
client.values().batchGet(spreadsheetId=spreadsheet_id, ranges=range, majorDimension="ROWS").execute()
|
||||
client.get_values(spreadsheetId=spreadsheet_id, ranges=range, majorDimension="ROWS")
|
||||
)
|
||||
row_cursor += ROW_BATCH_SIZE + 1
|
||||
# there should always be one range since we requested only one
|
||||
|
||||
@@ -27,8 +27,8 @@ from datetime import datetime
|
||||
from typing import Dict, FrozenSet, Iterable, List
|
||||
|
||||
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog
|
||||
from apiclient import discovery
|
||||
from google.oauth2 import service_account
|
||||
from googleapiclient import discovery
|
||||
|
||||
from .models.spreadsheet import RowData, Spreadsheet
|
||||
|
||||
@@ -37,17 +37,17 @@ SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly", "https://www.
|
||||
|
||||
class Helpers(object):
|
||||
@staticmethod
|
||||
def get_authenticated_sheets_client(credentials: Dict[str, str], scopes=SCOPES) -> discovery.Resource:
|
||||
def get_authenticated_sheets_client(credentials: Dict[str, str], scopes: List[str] = SCOPES) -> discovery.Resource:
|
||||
creds = Helpers.get_authenticated_google_credentials(credentials, scopes)
|
||||
return discovery.build("sheets", "v4", credentials=creds).spreadsheets()
|
||||
|
||||
@staticmethod
|
||||
def get_authenticated_drive_client(credentials: Dict[str, str], scopes=SCOPES) -> discovery.Resource:
|
||||
def get_authenticated_drive_client(credentials: Dict[str, str], scopes: List[str] = SCOPES) -> discovery.Resource:
|
||||
creds = Helpers.get_authenticated_google_credentials(credentials, scopes)
|
||||
return discovery.build("drive", "v3", credentials=creds)
|
||||
|
||||
@staticmethod
|
||||
def get_authenticated_google_credentials(credentials: Dict[str, str], scopes=SCOPES):
|
||||
def get_authenticated_google_credentials(credentials: Dict[str, str], scopes: List[str] = SCOPES):
|
||||
return service_account.Credentials.from_service_account_info(credentials, scopes=scopes)
|
||||
|
||||
@staticmethod
|
||||
@@ -86,10 +86,8 @@ class Helpers(object):
|
||||
return [value.formattedValue for value in row_data.values]
|
||||
|
||||
@staticmethod
|
||||
def get_first_row(client: discovery.Resource, spreadsheet_id: str, sheet_name: str) -> List[str]:
|
||||
spreadsheet = Spreadsheet.parse_obj(
|
||||
client.get(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet_name}!1:1").execute()
|
||||
)
|
||||
def get_first_row(client, spreadsheet_id: str, sheet_name: str) -> List[str]:
|
||||
spreadsheet = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet_name}!1:1"))
|
||||
|
||||
# There is only one sheet since we are specifying the sheet in the requested ranges.
|
||||
returned_sheets = spreadsheet.sheets
|
||||
@@ -133,7 +131,7 @@ class Helpers(object):
|
||||
|
||||
@staticmethod
|
||||
def get_available_sheets_to_column_index_to_name(
|
||||
client: discovery.Resource, spreadsheet_id: str, requested_sheets_and_columns: Dict[str, FrozenSet[str]]
|
||||
client, spreadsheet_id: str, requested_sheets_and_columns: Dict[str, FrozenSet[str]]
|
||||
) -> Dict[str, Dict[int, str]]:
|
||||
available_sheets = Helpers.get_sheets_in_spreadsheet(client, spreadsheet_id)
|
||||
|
||||
@@ -150,8 +148,8 @@ class Helpers(object):
|
||||
return available_sheets_to_column_index_to_name
|
||||
|
||||
@staticmethod
|
||||
def get_sheets_in_spreadsheet(client: discovery.Resource, spreadsheet_id: str):
|
||||
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False).execute())
|
||||
def get_sheets_in_spreadsheet(client, spreadsheet_id: str):
|
||||
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False))
|
||||
return [sheet.properties.title for sheet in spreadsheet_metadata.sheets]
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -29,8 +29,8 @@ from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
from airbyte_protocol import ConfiguredAirbyteCatalog, ConnectorSpecification
|
||||
from apiclient import discovery
|
||||
from base_python_test import StandardSourceTestIface
|
||||
from google_sheets_source.client import GoogleSheetsClient
|
||||
from google_sheets_source.helpers import Helpers
|
||||
from google_sheets_source.models.spreadsheet import Spreadsheet
|
||||
|
||||
@@ -61,8 +61,8 @@ class GoogleSheetsSourceStandardTest(StandardSourceTestIface):
|
||||
def setup(self) -> None:
|
||||
Path(self._get_tmp_dir()).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
sheets_client = Helpers.get_authenticated_sheets_client(self._get_creds(), SCOPES)
|
||||
spreadsheet_id = GoogleSheetsSourceStandardTest._create_spreadsheet(sheets_client)
|
||||
sheets_client = GoogleSheetsClient(self._get_creds(), SCOPES)
|
||||
spreadsheet_id = self._create_spreadsheet(sheets_client)
|
||||
self._write_spreadsheet_id(spreadsheet_id)
|
||||
|
||||
def teardown(self) -> None:
|
||||
@@ -89,8 +89,7 @@ class GoogleSheetsSourceStandardTest(StandardSourceTestIface):
|
||||
def _get_tmp_dir():
|
||||
return "/test_root/gsheet_test"
|
||||
|
||||
@staticmethod
|
||||
def _create_spreadsheet(sheets_client: discovery.Resource) -> str:
|
||||
def _create_spreadsheet(self, sheets_client: GoogleSheetsClient) -> str:
|
||||
"""
|
||||
:return: spreadsheetId
|
||||
"""
|
||||
@@ -99,7 +98,7 @@ class GoogleSheetsSourceStandardTest(StandardSourceTestIface):
|
||||
"sheets": [{"properties": {"title": "sheet1"}}, {"properties": {"title": "sheet2"}}],
|
||||
}
|
||||
|
||||
spreadsheet = Spreadsheet.parse_obj(sheets_client.create(body=request).execute())
|
||||
spreadsheet = Spreadsheet.parse_obj(sheets_client.create(body=request))
|
||||
spreadsheet_id = spreadsheet.spreadsheetId
|
||||
|
||||
rows = [["header1", "irrelevant", "header3", "", "ignored"]]
|
||||
@@ -109,13 +108,13 @@ class GoogleSheetsSourceStandardTest(StandardSourceTestIface):
|
||||
rows.append(["", "", ""])
|
||||
rows.append(["orphan1", "orphan2", "orphan3"])
|
||||
|
||||
sheets_client.values().batchUpdate(
|
||||
sheets_client.update_values(
|
||||
spreadsheetId=spreadsheet_id,
|
||||
body={"data": {"majorDimension": "ROWS", "values": rows, "range": "sheet1"}, "valueInputOption": "RAW"},
|
||||
).execute()
|
||||
sheets_client.values().batchUpdate(
|
||||
)
|
||||
sheets_client.update_values(
|
||||
spreadsheetId=spreadsheet_id,
|
||||
body={"data": {"majorDimension": "ROWS", "values": rows, "range": "sheet2"}, "valueInputOption": "RAW"},
|
||||
).execute()
|
||||
)
|
||||
|
||||
return spreadsheet_id
|
||||
|
||||
@@ -33,6 +33,7 @@ setup(
|
||||
install_requires=[
|
||||
"airbyte-protocol",
|
||||
"base-python",
|
||||
"backoff",
|
||||
"requests",
|
||||
"google-auth-httplib2",
|
||||
"google-api-python-client",
|
||||
|
||||
@@ -23,9 +23,10 @@ SOFTWARE.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import Mock
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
|
||||
from google_sheets_source.client import GoogleSheetsClient
|
||||
from google_sheets_source.helpers import Helpers
|
||||
from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet
|
||||
|
||||
@@ -142,8 +143,10 @@ class TestHelpers(unittest.TestCase):
|
||||
|
||||
client = Mock()
|
||||
client.get.return_value.execute.return_value = fake_response
|
||||
|
||||
actual = Helpers.get_first_row(client, spreadsheet_id, sheet)
|
||||
with patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes: None):
|
||||
sheet_client = GoogleSheetsClient({"fake": "credentials"}, ["auth_scopes"])
|
||||
sheet_client.client = client
|
||||
actual = Helpers.get_first_row(sheet_client, spreadsheet_id, sheet)
|
||||
self.assertEqual(expected_first_row, actual)
|
||||
client.get.assert_called_with(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet}!1:1")
|
||||
|
||||
@@ -154,8 +157,10 @@ class TestHelpers(unittest.TestCase):
|
||||
client.get.return_value.execute.return_value = Spreadsheet(
|
||||
spreadsheetId=spreadsheet_id, sheets=[Sheet(properties=SheetProperties(title=t)) for t in expected_sheets]
|
||||
)
|
||||
|
||||
actual_sheets = Helpers.get_sheets_in_spreadsheet(client, spreadsheet_id)
|
||||
with patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes: None):
|
||||
sheet_client = GoogleSheetsClient({"fake": "credentials"}, ["auth_scopes"])
|
||||
sheet_client.client = client
|
||||
actual_sheets = Helpers.get_sheets_in_spreadsheet(sheet_client, spreadsheet_id)
|
||||
|
||||
self.assertEqual(expected_sheets, actual_sheets)
|
||||
client.get.assert_called_with(spreadsheetId=spreadsheet_id, includeGridData=False)
|
||||
@@ -186,9 +191,11 @@ class TestHelpers(unittest.TestCase):
|
||||
|
||||
client = Mock()
|
||||
client.get.side_effect = mock_client_call
|
||||
|
||||
with patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes: None):
|
||||
sheet_client = GoogleSheetsClient({"fake": "credentials"}, ["auth_scopes"])
|
||||
sheet_client.client = client
|
||||
actual = Helpers.get_available_sheets_to_column_index_to_name(
|
||||
client, spreadsheet_id, {sheet1: frozenset(sheet1_first_row), "doesnotexist": frozenset(["1", "2"])}
|
||||
sheet_client, spreadsheet_id, {sheet1: frozenset(sheet1_first_row), "doesnotexist": frozenset(["1", "2"])}
|
||||
)
|
||||
expected = {sheet1: {0: "1", 1: "2", 2: "3", 3: "4"}}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user