138 lines
4.8 KiB
Python
138 lines
4.8 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from typing import Any, Dict
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.models import AirbyteStream
|
|
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode
|
|
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
|
|
|
|
|
|
class Helpers(object):
|
|
base_url = "https://api.gridly.com/v1/"
|
|
|
|
@staticmethod
|
|
def view_detail_url(view_id: str) -> str:
|
|
return Helpers.base_url + f"views/{view_id}"
|
|
|
|
@staticmethod
|
|
def view_list_url(grid_id: str) -> str:
|
|
return Helpers.base_url + f"views?gridId={grid_id}"
|
|
|
|
@staticmethod
|
|
def grid_detail_url(grid_id: str) -> str:
|
|
return Helpers.base_url + f"grids/{grid_id}"
|
|
|
|
@staticmethod
|
|
def get_views(auth: TokenAuthenticator, grid_id: str) -> Dict[str, Any]:
|
|
url = Helpers.view_list_url(grid_id)
|
|
try:
|
|
response = requests.get(url, headers=auth.get_auth_header())
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise Exception("Invalid API Key")
|
|
elif e.response.status_code == 404:
|
|
raise Exception(f"Grid id '{grid_id}' not found")
|
|
else:
|
|
raise Exception(f"Error getting listing views of grid '{grid_id}'")
|
|
|
|
return response.json()
|
|
|
|
@staticmethod
|
|
def get_grid(auth: TokenAuthenticator, grid_id: str) -> Dict[str, Any]:
|
|
url = Helpers.grid_detail_url(grid_id)
|
|
try:
|
|
response = requests.get(url, headers=auth.get_auth_header())
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise Exception("Invalid API Key")
|
|
elif e.response.status_code == 404:
|
|
raise Exception(f"Grid '{grid_id}' not found")
|
|
else:
|
|
raise Exception(f"Error getting grid {grid_id}: {e}")
|
|
return response.json()
|
|
|
|
@staticmethod
|
|
def get_view(auth: TokenAuthenticator, view_id: str) -> Dict[str, Any]:
|
|
url = Helpers.view_detail_url(view_id)
|
|
try:
|
|
response = requests.get(url, headers=auth.get_auth_header())
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise Exception("Invalid API Key")
|
|
elif e.response.status_code == 404:
|
|
raise Exception(f"View '{view_id}' not found")
|
|
else:
|
|
raise Exception(f"Error getting view {view_id}: {e}")
|
|
return response.json()
|
|
|
|
@staticmethod
|
|
def to_airbyte_data_type(column_type: str) -> str:
|
|
if column_type == "number":
|
|
return "number"
|
|
elif column_type == "boolean":
|
|
return "boolean"
|
|
else:
|
|
return "string"
|
|
|
|
@staticmethod
|
|
def get_json_schema(view: Dict[str, Any]) -> Dict[str, str]:
|
|
columns = view.get("columns", {})
|
|
properties = {}
|
|
|
|
for column in columns:
|
|
column_id = column.get("id")
|
|
column_type = column.get("type", "singleLine")
|
|
properties[column_id] = {"type": ["null", Helpers.to_airbyte_data_type(column_type)]}
|
|
|
|
json_schema = {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": "object",
|
|
"properties": properties,
|
|
}
|
|
return json_schema
|
|
|
|
@staticmethod
|
|
def get_airbyte_stream(view: Dict[str, Any]) -> AirbyteStream:
|
|
view_name = view.get("name")
|
|
columns = view.get("columns", {})
|
|
properties = {}
|
|
|
|
for column in columns:
|
|
column_id = column.get("id")
|
|
column_type = column.get("type", "singleLine")
|
|
properties[column_id] = {"type": ["null", Helpers.to_airbyte_data_type(column_type)]}
|
|
|
|
json_schema = Helpers.get_json_schema(view)
|
|
|
|
return AirbyteStream(
|
|
name=view_name,
|
|
json_schema=json_schema,
|
|
supported_sync_modes=[SyncMode.full_refresh],
|
|
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup],
|
|
)
|
|
|
|
@staticmethod
|
|
def transform_record(record: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
|
|
schema_properties = schema.get("properties")
|
|
columns = [k for k, v in schema_properties.items()]
|
|
|
|
cells = record.get("cells")
|
|
|
|
transformed_record = {}
|
|
if "_recordId" in columns:
|
|
transformed_record["_recordId"] = record.get("id")
|
|
if "_path" in columns:
|
|
transformed_record["_path"] = record.get("path", "")
|
|
|
|
for cell in cells:
|
|
transformed_record[cell.get("columnId")] = cell.get("value")
|
|
|
|
return transformed_record
|