1
0
mirror of synced 2026-01-17 12:07:50 -05:00

Add Primary Key to CDK. (#3105)

Hold off on publishing the version and updating various downstream consumers for now since a major refactor is happening.
This commit is contained in:
Davin Chia
2021-04-30 14:49:19 +08:00
committed by GitHub
parent 7d0a794bb4
commit fc00d36d0c
2 changed files with 33 additions and 0 deletions

View File

@@ -81,6 +81,11 @@ class Stream(ABC):
stream.source_defined_cursor = self.source_defined_cursor
stream.supported_sync_modes.append(SyncMode.incremental)
stream.default_cursor_field = self._wrapped_cursor_field()
keys = self._wrapped_primary_key()
if len(keys) > 0:
stream.source_defined_primary_key = keys
return stream
@property
@@ -108,6 +113,33 @@ class Stream(ABC):
"""
return True
@property
@abstractmethod
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""
def _wrapped_primary_key(self) -> Optional[List[List[str]]]:
"""
:return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object.
"""
keys = self.primary_key
if isinstance(keys, str):
return [[keys]]
elif isinstance(keys, list):
wrapped_key = []
for component in keys:
if isinstance(component, str):
wrapped_key.append([component])
elif isinstance(component, list):
wrapped_key.append(component)
else:
raise ValueError("Element must be either list or str.")
else:
raise ValueError("Element must be either list or str.")
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, any]]]: