82 lines
3.0 KiB
Python
82 lines
3.0 KiB
Python
# Copyright (c) 2024 Couchbase, Inc., all rights reserved.
|
|
|
|
from datetime import datetime
|
|
from typing import Any, Iterable, List, Mapping, MutableMapping
|
|
|
|
from couchbase.cluster import Cluster
|
|
|
|
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, SyncMode, Type
|
|
from airbyte_cdk.sources.streams import Stream
|
|
from airbyte_cdk.sources.streams.core import CheckpointMixin
|
|
|
|
from .queries import get_documents_query
|
|
|
|
|
|
class CouchbaseStream(Stream):
|
|
primary_key = "_id"
|
|
|
|
def __init__(self, cluster: Cluster, bucket: str, scope: str, collection: str):
|
|
self.cluster = cluster
|
|
self.bucket = bucket
|
|
self.scope = scope
|
|
self.collection = collection
|
|
self._name = f"{bucket}.{scope}.{collection}"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
|
|
class DocumentStream(CouchbaseStream, CheckpointMixin):
|
|
cursor_field = "_ab_cdc_updated_at"
|
|
|
|
def __init__(self, cluster: Cluster, bucket: str, scope: str, collection: str):
|
|
super().__init__(cluster, bucket, scope, collection)
|
|
self._state: MutableMapping[str, Any] = {}
|
|
|
|
@property
|
|
def state(self) -> MutableMapping[str, Any]:
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value: MutableMapping[str, Any]):
|
|
self._state = value
|
|
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
return {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": "object",
|
|
"properties": {
|
|
"_id": {"type": "string"},
|
|
self.cursor_field: {"type": "integer"},
|
|
self.collection: {"type": "object", "additionalProperties": True},
|
|
},
|
|
}
|
|
|
|
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: AirbyteRecordMessage) -> Mapping[str, Any]:
|
|
latest_cursor_value = latest_record.data.get(self.cursor_field)
|
|
current_cursor_value = current_stream_state.get(self.cursor_field)
|
|
|
|
if latest_cursor_value is not None and (current_cursor_value is None or int(latest_cursor_value) > int(current_cursor_value)):
|
|
return {self.cursor_field: int(latest_cursor_value)}
|
|
return current_stream_state
|
|
|
|
def read_records(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str] = None,
|
|
stream_slice: Mapping[str, Any] = None,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[AirbyteMessage]:
|
|
cursor_value = stream_state.get(self.cursor_field, 0) if stream_state else 0
|
|
|
|
query = get_documents_query(
|
|
self.bucket, self.scope, self.collection, self.cursor_field, cursor_value if sync_mode == SyncMode.incremental else None
|
|
)
|
|
|
|
for row in self.cluster.query(query):
|
|
record = AirbyteRecordMessage(stream=self.name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000)
|
|
yield AirbyteMessage(type=Type.RECORD, record=record)
|
|
if sync_mode == SyncMode.incremental:
|
|
self.state = self.get_updated_state(self.state, record)
|