109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
# Handles serializing any fauna document into an airbyte record
|
|
|
|
import base64
|
|
from datetime import date
|
|
|
|
from faunadb import _json
|
|
from faunadb.objects import FaunaTime, Query, Ref, SetRef
|
|
|
|
|
|
def fauna_doc_to_airbyte(doc: dict) -> dict:
|
|
"""
|
|
Converts a full fauna document into the airbyte representation.
|
|
|
|
This will mutate and return the input `doc`. If you don't want this behavior, deep copy
|
|
the dict before passing it in.
|
|
"""
|
|
for k, v in doc.items():
|
|
doc[k] = _fauna_value_to_airbyte(v)
|
|
return doc
|
|
|
|
|
|
def _fauna_value_to_airbyte(value: any) -> any:
|
|
"""
|
|
Converts a fauna document to an airbyte-serializable document. This will simply replace
|
|
all FaunaTime, Ref, dates, and byte arrays with native json objects.
|
|
|
|
This will mutate `value` (if possible), and return the new `value`.
|
|
"""
|
|
if isinstance(value, dict):
|
|
for k, v in value.items():
|
|
value[k] = _fauna_value_to_airbyte(v)
|
|
return value
|
|
elif isinstance(value, list):
|
|
for i, v in enumerate(value):
|
|
value[i] = _fauna_value_to_airbyte(v)
|
|
return value
|
|
elif isinstance(value, Ref):
|
|
# serialize this however we feel like
|
|
return ref_to_airbyte(value)
|
|
elif isinstance(value, (Query, SetRef)):
|
|
# for these we give up :P
|
|
return _json.to_json(value)
|
|
elif isinstance(value, FaunaTime):
|
|
# this matches the airbyte `date-time` spec
|
|
return value.value
|
|
elif isinstance(value, date):
|
|
# this matches the airbyte `date` spec
|
|
return value.isoformat()
|
|
elif isinstance(value, (bytes, bytearray)):
|
|
# airbyte has no byte arrays, so this is just a string
|
|
return base64.urlsafe_b64encode(value).decode("utf-8")
|
|
else:
|
|
# if its anything else, we don't mutate it, and let the json
|
|
# serializer deal with it.
|
|
return value
|
|
|
|
|
|
def ref_to_airbyte(ref) -> dict:
|
|
# Note that the ref.database() field is never set, so we ignore it.
|
|
if ref.collection() is None:
|
|
# We have no nesting on this ref. Therefore, it is invalid, so
|
|
# we return an unknown type.
|
|
return {"id": ref.id(), "type": "unknown"}
|
|
elif ref.collection().collection() is None:
|
|
# We have a singly nested ref.
|
|
# Example: Ref("my_collection", Ref("collections"))
|
|
# or: Ref("my_index", Ref("indexes"))
|
|
collection_names = {
|
|
"collections": "collection",
|
|
"databases": "database",
|
|
"indexes": "index",
|
|
"functions": "function",
|
|
"roles": "role",
|
|
"access_providers": "access_provider",
|
|
"keys": "key",
|
|
"tokens": "token",
|
|
"credentials": "credential",
|
|
}
|
|
return {
|
|
"id": ref.id(),
|
|
"type": collection_names.get(
|
|
# Use the collection id as the key in the above map
|
|
ref.collection().id(),
|
|
# If that fails, we have an invalid ref, so we fallback to this id.
|
|
ref.collection().id(),
|
|
),
|
|
}
|
|
elif (ref.collection().collection().collection() is None) and (ref.collection().collection().id() == "collections"):
|
|
# This is a document.
|
|
#
|
|
# Example: Ref("1234", Ref("collection_name", Ref("collections")))
|
|
return {
|
|
"id": ref.id(),
|
|
"collection": ref.collection().id(),
|
|
"type": "document",
|
|
}
|
|
else:
|
|
# We have a tripply nested ref, so we are in undefined behavior.
|
|
#
|
|
# Just try our best, and produce `type: unknown`
|
|
return {
|
|
"id": ref.id(),
|
|
"type": "unknown",
|
|
}
|