136 lines
5.7 KiB
Python
136 lines
5.7 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Generator
|
|
|
|
from airbyte_cdk.models import (
|
|
AirbyteCatalog,
|
|
AirbyteConnectionStatus,
|
|
AirbyteMessage,
|
|
AirbyteRecordMessage,
|
|
AirbyteStream,
|
|
ConfiguredAirbyteCatalog,
|
|
Status,
|
|
Type,
|
|
)
|
|
from airbyte_cdk.sources import Source
|
|
|
|
from .firebase_rtdb import Client
|
|
|
|
|
|
class SourceFirebaseRealtimeDatabase(Source):
|
|
DEFAULT_BUFFER_SIZE = 10000
|
|
DEFAULT_PATH = ""
|
|
|
|
@staticmethod
|
|
def stream_name_from(config):
|
|
path = config.get("path", SourceFirebaseRealtimeDatabase.DEFAULT_PATH)
|
|
node_name = path.rstrip("/").split("/")[-1]
|
|
if not node_name:
|
|
node_name = config["database_name"]
|
|
|
|
return node_name.replace("-", "_")
|
|
|
|
def check(self, logger: logging.Logger, config: json) -> AirbyteConnectionStatus:
|
|
"""
|
|
Tests if the input configuration can be used to successfully connect to Firebase Realtime Database
|
|
|
|
:param logger: Logging object to display debug/info/error to the logs
|
|
(logs will not be accessible via airbyte UI if they are not passed to this logger)
|
|
:param config: Json object containing the configuration of this source, content of this json is as specified in
|
|
the properties of the spec.yaml file
|
|
|
|
:return: AirbyteConnectionStatus indicating a Success or Failure
|
|
"""
|
|
try:
|
|
database_name = config["database_name"]
|
|
google_application_credentials = config["google_application_credentials"]
|
|
|
|
client = Client()
|
|
client.initialize(database_name, google_application_credentials)
|
|
|
|
# get root-node's keys to check connectivity
|
|
client.check_connection()
|
|
|
|
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
|
|
except Exception as e:
|
|
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}")
|
|
|
|
def discover(self, logger: logging.Logger, config: json) -> AirbyteCatalog:
|
|
"""
|
|
Returns an AirbyteCatalog representing the available streams and fields in the users's connection to Firebase.
|
|
This connector returns only one stream that is specified by `path` in the config.
|
|
|
|
:param logger: Logging object to display debug/info/error to the logs
|
|
(logs will not be accessible via airbyte UI if they are not passed to this logger)
|
|
:param config: Json object containing the configuration of this source, content of this json is as specified in
|
|
the properties of the spec.yaml file
|
|
|
|
:return: AirbyteCatalog is an object describing a list of all available streams in this source.
|
|
A stream is an AirbyteStream object that includes:
|
|
- its stream name (or table name in the case of Postgres)
|
|
- json_schema providing the specifications of expected schema for this stream (a list of columns described
|
|
by their names and types)
|
|
"""
|
|
streams = []
|
|
|
|
json_schema = {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": "object",
|
|
"properties": {
|
|
"key": {"type": "string"},
|
|
"value": {"type": "string"},
|
|
},
|
|
}
|
|
|
|
stream_name = self.stream_name_from(config)
|
|
|
|
streams.append(AirbyteStream(name=stream_name, json_schema=json_schema, supported_sync_modes=["full_refresh"]))
|
|
|
|
return AirbyteCatalog(streams=streams)
|
|
|
|
def read(
|
|
self, logger: logging.Logger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
|
|
) -> Generator[AirbyteMessage, None, None]:
|
|
"""
|
|
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration,
|
|
catalog, and state.
|
|
|
|
:param logger: Logging object to display debug/info/error to the logs
|
|
(logs will not be accessible via airbyte UI if they are not passed to this logger)
|
|
:param config: Json object containing the configuration of this source, content of this json is as specified in
|
|
the properties of the spec.yaml file
|
|
:param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog
|
|
returned by discover(), but
|
|
in addition, it's been configured in the UI! For each particular stream and field, there may have been provided
|
|
with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc
|
|
:param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume
|
|
replication in the future from that saved checkpoint.
|
|
This is the object that is provided with state from previous runs and avoid replicating the entire set of
|
|
data everytime.
|
|
|
|
:return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object.
|
|
"""
|
|
|
|
stream = catalog.streams[0].stream
|
|
stream_name = stream.name
|
|
|
|
buffer_size = config.get("buffer_size", self.DEFAULT_BUFFER_SIZE)
|
|
path = config.get("path", self.DEFAULT_PATH)
|
|
database_name = config["database_name"]
|
|
google_application_credentials = config["google_application_credentials"]
|
|
|
|
client = Client(path, buffer_size)
|
|
client.initialize(database_name, google_application_credentials)
|
|
|
|
for data in client.extract():
|
|
yield AirbyteMessage(
|
|
type=Type.RECORD,
|
|
record=AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=int(datetime.now().timestamp()) * 1000),
|
|
)
|