342 lines
14 KiB
Python
342 lines
14 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import logging
|
|
from abc import ABC
|
|
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources import AbstractSource
|
|
from airbyte_cdk.sources.streams import Stream
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
|
|
from .auth import WebflowTokenAuthenticator
|
|
from .webflow_to_airbyte_mapping import WebflowToAirbyteMapping
|
|
|
|
|
|
"""
|
|
This module is used for pulling the contents of "collections" out of Webflow, which is a CMS for hosting websites.
|
|
A Webflow collection may be a group of items such as "Blog Posts", "Blog Authors", etc.
|
|
There may be many collections, each of which can have its own distinct schema. This module will dynamically figure out
|
|
which collections are available, and will dynamically create the schema for each collection based on information
|
|
extracted from Webflow. It will then download all of the items from all of the selected collections.
|
|
|
|
Because the amount of data is expected to be "small" (not TB of data), we have not implemented any kind of
|
|
incremental downloading of data from Webflow. Each time this code is exectued, it will pull back all of the items
|
|
that are contained in each of the desired collections.
|
|
"""
|
|
|
|
|
|
# Webflow expects a 'accept-version' header with a value of '1.0.0' (as of May 2022)
|
|
WEBFLOW_ACCEPT_VERSION = "1.0.0"
|
|
|
|
|
|
# Basic full refresh stream
|
|
class WebflowStream(HttpStream, ABC):
|
|
"""
|
|
This class represents a stream output by the connector.
|
|
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL,
|
|
pagination strategy, parsing responses etc..
|
|
|
|
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
|
|
"""
|
|
|
|
url_base = "https://api.webflow.com/"
|
|
|
|
# The following call is need to fix what appears to be a bug in http.py line 119
|
|
# Bug reported at: https://github.com/airbytehq/airbyte/issues/13283
|
|
@property
|
|
def authenticator(self) -> WebflowTokenAuthenticator:
|
|
return self._session.auth
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
"""
|
|
Common params e.g. pagination size etc.
|
|
"""
|
|
return {}
|
|
|
|
|
|
class CollectionSchema(WebflowStream):
|
|
"""
|
|
Gets the schema of the current collection - see: https://docs.developers.webflow.com/v1.0.0/reference/get-collection, and
|
|
then converts that schema to a json-schema.org-compatible schema that uses supported Airbyte types.
|
|
|
|
More info about Webflow schema: https://docs.developers.webflow.com/v1.0.0/reference/get-collection
|
|
Airbyte data types: https://docs.airbyte.com/understanding-airbyte/supported-data-types/
|
|
"""
|
|
|
|
# primary_key is not used as we don't do incremental syncs - https://docs.airbyte.com/understanding-airbyte/connections/
|
|
primary_key = None
|
|
|
|
def __init__(self, collection_id: str = None, **kwargs):
|
|
self.collection_id = collection_id
|
|
super().__init__(**kwargs)
|
|
|
|
def path(self, **kwargs) -> str:
|
|
"""
|
|
See: https://docs.developers.webflow.com/v1.0.0/reference/get-collection
|
|
Returns a collection with full schema by collection_id
|
|
"""
|
|
|
|
path = f"collections/{self.collection_id}"
|
|
return path
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
"""
|
|
Converts the webflow schema into an Airbyte-compatible schema
|
|
|
|
Webflow schema API returns an array of fields contained in the "fields" field.
|
|
Get field name and field type from this array, and then map it to an airbyte-supported type
|
|
"""
|
|
|
|
response_json = response.json()
|
|
for field in response_json["fields"]:
|
|
try:
|
|
field_name = field["slug"]
|
|
field_type = field["type"]
|
|
field_schema = {field_name: WebflowToAirbyteMapping.webflow_to_airbyte_mapping[field_type]}
|
|
yield field_schema # get records from the "fields" array
|
|
except Exception as e:
|
|
msg = f"""Encountered an exception parsing schema for Webflow type: {field_type}.
|
|
Is "{field_type}" defined in the mapping between Webflow and json schma ? """
|
|
self.logger.exception(msg)
|
|
|
|
# Don't eat the exception, raise it again as this needs to be fixed
|
|
raise e
|
|
|
|
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
|
"""This API does not return any information to support pagination"""
|
|
return {}
|
|
|
|
|
|
class CollectionsList(WebflowStream):
|
|
"""
|
|
The data that we are generally interested in pulling from Webflow is stored in "Collections".
|
|
Example Collections that may be of interest are: "Blog Posts", "Blog Authors", etc.
|
|
|
|
This class provides the functionality for getting a list containing metadata about available collections
|
|
More info https://developers.webflow.com/#list-collections
|
|
"""
|
|
|
|
# primary_key is not used as we don't do incremental syncs - https://docs.airbyte.com/understanding-airbyte/connections/
|
|
primary_key = None
|
|
|
|
def __init__(self, site_id: str = None, **kwargs):
|
|
self.site_id = site_id
|
|
super().__init__(**kwargs)
|
|
|
|
def path(self, **kwargs) -> str:
|
|
"""
|
|
See: https://developers.webflow.com/#list-collections
|
|
Returns a list which contains high-level information about each collection.
|
|
"""
|
|
|
|
path = f"sites/{self.site_id}/collections"
|
|
return path
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
"""
|
|
This API returns a list containing json objects. So we can just yield each element from the list
|
|
"""
|
|
response_json = response.json()
|
|
yield from response_json
|
|
|
|
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
|
"""This API does not return any information to support pagination"""
|
|
return {}
|
|
|
|
|
|
class CollectionContents(WebflowStream):
|
|
"""
|
|
This stream is used for pulling "items" out of a given Webflow collection. Because there is not a fixed number of collections with
|
|
pre-defined names, each stream is an object that uses the passed-in collection name for the stream name.
|
|
|
|
Note that because the Webflow API works with collection ids rather than collection names, the collection id is
|
|
used for hitting the Webflow API.
|
|
|
|
An example of a collection is "Blog Posts", which contains a list of items, where each item is a JSON-representation of a blog article.
|
|
"""
|
|
|
|
# primary_key is not used as we don't do incremental syncs - https://docs.airbyte.com/understanding-airbyte/connections/
|
|
primary_key = None
|
|
|
|
# only want to create the name to id lookup table once
|
|
|
|
def __init__(self, site_id: str = None, collection_id: str = None, collection_name: str = None, **kwargs):
|
|
"""override __init__ to add collection-related variables"""
|
|
self.site_id = site_id
|
|
super().__init__(**kwargs)
|
|
self.collection_name = collection_name
|
|
self.collection_id = collection_id
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.collection_name
|
|
|
|
def path(self, **kwargs) -> str:
|
|
"""
|
|
The path to get the "items" in the requested collection uses the "_id" of the collection in the URL.
|
|
See: https://developers.webflow.com/#items
|
|
|
|
return collections/<collection_id>/items
|
|
"""
|
|
path = f"collections/{self.collection_id}/items"
|
|
return path
|
|
|
|
def next_page_token(self, response: requests.Response) -> Mapping[str, Any]:
|
|
decoded_response = response.json()
|
|
if decoded_response.get("count", 0) != 0 and decoded_response.get("items", []) != []:
|
|
# Webflow uses an offset for pagination https://developers.webflow.com/#item-model
|
|
offset = decoded_response["offset"] + decoded_response["count"]
|
|
return {"offset": offset}
|
|
else:
|
|
return {}
|
|
|
|
def request_params(
|
|
self,
|
|
stream_state: Mapping[str, Any],
|
|
stream_slice: Mapping[str, Any] = None,
|
|
next_page_token: Mapping[str, Any] = None,
|
|
) -> MutableMapping[str, Any]:
|
|
# Webflow default pagination is 100, for debugging pagination we set this to a low value.
|
|
# This should be set back to 100 for production
|
|
params = {"limit": 100}
|
|
|
|
# Handle pagination by inserting the next page's token in the request parameters
|
|
if next_page_token:
|
|
params.update(next_page_token)
|
|
|
|
return params
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
"""
|
|
Webflow items API returns an array of items contained in the "items" field.
|
|
"""
|
|
|
|
response_json = response.json()
|
|
# The items API returns records inside a container list called "items"
|
|
for item in response_json["items"]:
|
|
yield item
|
|
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
"""
|
|
Webflow has an API,but it is not consistent with json-schema.org schemas. We use the CollectionSchema stream
|
|
to get these schemas and to also map them to json-schema format.
|
|
"""
|
|
|
|
collection_id = self.collection_id
|
|
schema_stream = CollectionSchema(authenticator=self._session.auth, collection_id=collection_id)
|
|
schema_records = schema_stream.read_records(sync_mode="full_refresh")
|
|
|
|
# each record corresponds to a property in the json schema. So we loop over each of these properties
|
|
# and add it to the json schema.
|
|
json_schema = {}
|
|
for schema_property in schema_records:
|
|
json_schema.update(schema_property)
|
|
|
|
# Manually add in _cid and _id, which are not included in the list of fields sent back from Webflow,
|
|
# but which are necessary for joining data in the database
|
|
extra_fields = {
|
|
"_id": {"type": ["null", "string"]},
|
|
"_cid": {"type": ["null", "string"]},
|
|
"_locale": {"type": ["null", "string"]},
|
|
}
|
|
json_schema.update(extra_fields)
|
|
|
|
return {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"additionalProperties": True,
|
|
"type": "object",
|
|
"properties": json_schema,
|
|
}
|
|
|
|
|
|
class SourceWebflow(AbstractSource):
|
|
"""This is the main class that defines the methods that will be called by Airbyte infrastructure"""
|
|
|
|
@staticmethod
|
|
def _get_collection_name_to_id_dict(authenticator: str = None, site_id: str = None) -> Mapping[str, str]:
|
|
"""
|
|
Most of the Webflow APIs require the collection id, but the streams that we are generating use the collection name.
|
|
This function will return a dictionary containing collection_name: collection_id entries.
|
|
"""
|
|
|
|
collection_name_to_id_dict = {}
|
|
|
|
collections_stream = CollectionsList(authenticator=authenticator, site_id=site_id)
|
|
collections_records = collections_stream.read_records(sync_mode="full_refresh")
|
|
|
|
# Loop over the list of records and create a dictionary with name as key, and _id as value
|
|
for collection_obj in collections_records:
|
|
collection_name_to_id_dict[collection_obj["name"]] = collection_obj["_id"]
|
|
|
|
return collection_name_to_id_dict
|
|
|
|
@staticmethod
|
|
def get_authenticator(config):
|
|
"""
|
|
Verifies that the information for setting the header has been set, and returns a class
|
|
which overloads that standard authentication to include additional headers that are required by Webflow.
|
|
"""
|
|
api_key = config.get("api_key", None)
|
|
accept_version = config.get("accept_version", WEBFLOW_ACCEPT_VERSION)
|
|
if not api_key:
|
|
raise Exception("Config validation error: 'api_key' is a required property")
|
|
|
|
auth = WebflowTokenAuthenticator(token=api_key, accept_version=accept_version)
|
|
return auth
|
|
|
|
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
|
|
"""
|
|
A check to validate that the user-provided config can be used to connect to the underlying API
|
|
|
|
:param config: the user-input config object conforming to the connector's spec.yaml
|
|
:param logger: logger object
|
|
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
|
|
"""
|
|
|
|
try:
|
|
# Check that authenticator can be retrieved
|
|
auth = self.get_authenticator(config)
|
|
site_id = config.get("site_id")
|
|
collections_stream = CollectionsList(authenticator=auth, site_id=site_id)
|
|
collections_records = collections_stream.read_records(sync_mode="full_refresh")
|
|
record = next(collections_records)
|
|
logger.info(f"Successfully connected to CollectionsList stream. Pulled one record: {record}")
|
|
return True, None
|
|
except Exception as e:
|
|
return False, e
|
|
|
|
def generate_streams(self, authenticator: WebflowTokenAuthenticator, site_id: str) -> List[Stream]:
|
|
"""Generates a list of stream by their names."""
|
|
|
|
collection_name_to_id_dict = self._get_collection_name_to_id_dict(authenticator=authenticator, site_id=site_id)
|
|
|
|
for collection_name, collection_id in collection_name_to_id_dict.items():
|
|
yield CollectionContents(
|
|
authenticator=authenticator,
|
|
site_id=site_id,
|
|
collection_id=collection_id,
|
|
collection_name=collection_name,
|
|
)
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
"""
|
|
:param config: A Mapping of the user input configuration as defined in the connector spec.
|
|
:return List[Stream]: A list/generator of the streams that Airbyte can pull data from.
|
|
"""
|
|
|
|
auth = self.get_authenticator(config)
|
|
site_id = config.get("site_id")
|
|
|
|
# Return a list (iterator) of the streams that will be available for use.
|
|
# We _dynamically_ generate streams that correspond to Webflow collections (eg. Blog Authors, Blog Posts, etc.)
|
|
streams = self.generate_streams(authenticator=auth, site_id=site_id)
|
|
|
|
return streams
|