1
0
mirror of synced 2025-12-19 18:14:56 -05:00
Files
Kaustav Ghosh 6a7264148d (Source) - Couchbase connector 🎉 (#45876)
Co-authored-by: Denis Rosa <deniswsrosa@gmail.com>
2025-03-07 13:35:56 -03:00

81 lines
3.1 KiB
Python

# Copyright (c) 2024 Couchbase, Inc., all rights reserved.
import logging
from datetime import timedelta
from typing import Any, List, Mapping, Tuple
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from .streams import DocumentStream
class SourceCouchbase(AbstractSource):
def __init__(self):
super().__init__()
self.connection_string = None
self.username = None
self.password = None
self.bucket_name = None
@property
def name(self) -> str:
return "Couchbase"
def _set_config_values(self, config: Mapping[str, Any]):
self.connection_string = config["connection_string"]
self.username = config["username"]
self.password = config["password"]
self.bucket_name = config["bucket"]
def _get_cluster(self) -> Cluster:
auth = PasswordAuthenticator(self.username, self.password)
options = ClusterOptions(auth)
options.apply_profile("wan_development")
cluster = Cluster(self.connection_string, options)
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@staticmethod
def _ensure_primary_index(cluster: Cluster, bucket: str, scope: str, collection: str):
index_name = f"{bucket}_{scope}_{collection}_primary_index"
query = f"CREATE PRIMARY INDEX IF NOT EXISTS `{index_name}` ON `{bucket}`.`{scope}`.`{collection}`"
logging.debug(f"Executing query to ensure primary index: {query}")
try:
cluster.query(query).execute()
logging.debug(f"Successfully ensured primary index for {bucket}.{scope}.{collection}")
except Exception as e:
logging.warning(f"Failed to create primary index for {bucket}.{scope}.{collection}: {str(e)}")
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
self._set_config_values(config)
try:
cluster = self._get_cluster()
bucket = cluster.bucket(self.bucket_name)
bucket.ping()
logger.info("Successfully connected to Couchbase cluster and bucket")
return True, None
except Exception as e:
logger.error(f"Connection check failed: {str(e)}")
return False, f"Connection check failed: {str(e)}"
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._set_config_values(config)
cluster = self._get_cluster()
bucket = cluster.bucket(self.bucket_name)
streams = []
for scope in bucket.collections().get_all_scopes():
for collection in scope.collections:
self._ensure_primary_index(cluster, self.bucket_name, scope.name, collection.name)
stream = DocumentStream(cluster, self.bucket_name, scope.name, collection.name)
streams.append(stream)
logging.info(f"Added stream for {scope.name}.{collection.name}")
logging.info(f"Generated {len(streams)} streams")
return streams