1
0
mirror of synced 2025-12-21 19:11:14 -05:00
Files
airbyte/airbyte-integrations/connectors/source-github/source_github/utils.py
2023-03-27 13:13:01 +03:00

27 lines
708 B
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
def getter(D: dict, key_or_keys, strict=True):
if not isinstance(key_or_keys, list):
key_or_keys = [key_or_keys]
for k in key_or_keys:
if strict:
D = D[k]
else:
D = D.get(k, {})
return D
def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record