81 lines
3.1 KiB
Python
81 lines
3.1 KiB
Python
# Copyright (c) 2024 Couchbase, Inc., all rights reserved.
|
|
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import Any, List, Mapping, Tuple
|
|
|
|
from couchbase.auth import PasswordAuthenticator
|
|
from couchbase.cluster import Cluster
|
|
from couchbase.options import ClusterOptions
|
|
|
|
from airbyte_cdk.sources import AbstractSource
|
|
from airbyte_cdk.sources.streams import Stream
|
|
|
|
from .streams import DocumentStream
|
|
|
|
|
|
class SourceCouchbase(AbstractSource):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.connection_string = None
|
|
self.username = None
|
|
self.password = None
|
|
self.bucket_name = None
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "Couchbase"
|
|
|
|
def _set_config_values(self, config: Mapping[str, Any]):
|
|
self.connection_string = config["connection_string"]
|
|
self.username = config["username"]
|
|
self.password = config["password"]
|
|
self.bucket_name = config["bucket"]
|
|
|
|
def _get_cluster(self) -> Cluster:
|
|
auth = PasswordAuthenticator(self.username, self.password)
|
|
options = ClusterOptions(auth)
|
|
options.apply_profile("wan_development")
|
|
cluster = Cluster(self.connection_string, options)
|
|
cluster.wait_until_ready(timedelta(seconds=5))
|
|
return cluster
|
|
|
|
@staticmethod
|
|
def _ensure_primary_index(cluster: Cluster, bucket: str, scope: str, collection: str):
|
|
index_name = f"{bucket}_{scope}_{collection}_primary_index"
|
|
query = f"CREATE PRIMARY INDEX IF NOT EXISTS `{index_name}` ON `{bucket}`.`{scope}`.`{collection}`"
|
|
logging.debug(f"Executing query to ensure primary index: {query}")
|
|
try:
|
|
cluster.query(query).execute()
|
|
logging.debug(f"Successfully ensured primary index for {bucket}.{scope}.{collection}")
|
|
except Exception as e:
|
|
logging.warning(f"Failed to create primary index for {bucket}.{scope}.{collection}: {str(e)}")
|
|
|
|
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
|
|
self._set_config_values(config)
|
|
try:
|
|
cluster = self._get_cluster()
|
|
bucket = cluster.bucket(self.bucket_name)
|
|
bucket.ping()
|
|
logger.info("Successfully connected to Couchbase cluster and bucket")
|
|
return True, None
|
|
except Exception as e:
|
|
logger.error(f"Connection check failed: {str(e)}")
|
|
return False, f"Connection check failed: {str(e)}"
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
self._set_config_values(config)
|
|
cluster = self._get_cluster()
|
|
bucket = cluster.bucket(self.bucket_name)
|
|
streams = []
|
|
|
|
for scope in bucket.collections().get_all_scopes():
|
|
for collection in scope.collections:
|
|
self._ensure_primary_index(cluster, self.bucket_name, scope.name, collection.name)
|
|
stream = DocumentStream(cluster, self.bucket_name, scope.name, collection.name)
|
|
streams.append(stream)
|
|
logging.info(f"Added stream for {scope.name}.{collection.name}")
|
|
|
|
logging.info(f"Generated {len(streams)} streams")
|
|
return streams
|