1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-fauna/source_fauna/serialize.py
Cole Snodgrass 2e099acc52 update headers from 2022 -> 2023 (#22594)
* It's 2023!

* 2022 -> 2023

---------

Co-authored-by: evantahler <evan@airbyte.io>
2023-02-08 13:01:16 -08:00

109 lines
3.7 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
# Handles serializing any fauna document into an airbyte record
import base64
from datetime import date
from faunadb import _json
from faunadb.objects import FaunaTime, Query, Ref, SetRef
def fauna_doc_to_airbyte(doc: dict) -> dict:
"""
Converts a full fauna document into the airbyte representation.
This will mutate and return the input `doc`. If you don't want this behavior, deep copy
the dict before passing it in.
"""
for k, v in doc.items():
doc[k] = _fauna_value_to_airbyte(v)
return doc
def _fauna_value_to_airbyte(value: any) -> any:
"""
Converts a fauna document to an airbyte-serializable document. This will simply replace
all FaunaTime, Ref, dates, and byte arrays with native json objects.
This will mutate `value` (if possible), and return the new `value`.
"""
if isinstance(value, dict):
for k, v in value.items():
value[k] = _fauna_value_to_airbyte(v)
return value
elif isinstance(value, list):
for i, v in enumerate(value):
value[i] = _fauna_value_to_airbyte(v)
return value
elif isinstance(value, Ref):
# serialize this however we feel like
return ref_to_airbyte(value)
elif isinstance(value, (Query, SetRef)):
# for these we give up :P
return _json.to_json(value)
elif isinstance(value, FaunaTime):
# this matches the airbyte `date-time` spec
return value.value
elif isinstance(value, date):
# this matches the airbyte `date` spec
return value.isoformat()
elif isinstance(value, (bytes, bytearray)):
# airbyte has no byte arrays, so this is just a string
return base64.urlsafe_b64encode(value).decode("utf-8")
else:
# if its anything else, we don't mutate it, and let the json
# serializer deal with it.
return value
def ref_to_airbyte(ref) -> dict:
# Note that the ref.database() field is never set, so we ignore it.
if ref.collection() is None:
# We have no nesting on this ref. Therefore, it is invalid, so
# we return an unknown type.
return {"id": ref.id(), "type": "unknown"}
elif ref.collection().collection() is None:
# We have a singly nested ref.
# Example: Ref("my_collection", Ref("collections"))
# or: Ref("my_index", Ref("indexes"))
collection_names = {
"collections": "collection",
"databases": "database",
"indexes": "index",
"functions": "function",
"roles": "role",
"access_providers": "access_provider",
"keys": "key",
"tokens": "token",
"credentials": "credential",
}
return {
"id": ref.id(),
"type": collection_names.get(
# Use the collection id as the key in the above map
ref.collection().id(),
# If that fails, we have an invalid ref, so we fallback to this id.
ref.collection().id(),
),
}
elif (ref.collection().collection().collection() is None) and (ref.collection().collection().id() == "collections"):
# This is a document.
#
# Example: Ref("1234", Ref("collection_name", Ref("collections")))
return {
"id": ref.id(),
"collection": ref.collection().id(),
"type": "document",
}
else:
# We have a tripply nested ref, so we are in undefined behavior.
#
# Just try our best, and produce `type: unknown`
return {
"id": ref.id(),
"type": "unknown",
}