🐛 CDK: fix bug with limit parameter for incremental stream (#5833)
* CDK: fix bug with limit parameter for incremental stream Co-authored-by: Dmytro Rezchykov <dmitry.rezchykov@zazmic.com>
This commit is contained in:
@@ -137,29 +137,40 @@ class AbstractSource(Source, ABC):
|
||||
|
||||
use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
|
||||
if use_incremental:
|
||||
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state)
|
||||
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config)
|
||||
else:
|
||||
record_iterator = self._read_full_refresh(stream_instance, configured_stream)
|
||||
record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config)
|
||||
|
||||
record_counter = 0
|
||||
stream_name = configured_stream.stream.name
|
||||
logger.info(f"Syncing stream: {stream_name} ")
|
||||
for record in record_iterator:
|
||||
if record.type == MessageType.RECORD:
|
||||
if internal_config.limit and record_counter >= internal_config.limit:
|
||||
logger.info(f"Reached limit defined by internal config ({internal_config.limit}), stop reading")
|
||||
break
|
||||
record_counter += 1
|
||||
yield record
|
||||
|
||||
logger.info(f"Read {record_counter} records from {stream_name} stream")
|
||||
|
||||
@staticmethod
|
||||
def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool:
|
||||
"""
|
||||
Check if record count reached liimt set by internal config.
|
||||
:param internal_config - internal CDK configuration separated from user defined config
|
||||
:records_counter - number of records already red
|
||||
:return True if limit reached, False otherwise
|
||||
"""
|
||||
if internal_config.limit:
|
||||
if records_counter >= internal_config.limit:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _read_incremental(
|
||||
self,
|
||||
logger: AirbyteLogger,
|
||||
stream_instance: Stream,
|
||||
configured_stream: ConfiguredAirbyteStream,
|
||||
connector_state: MutableMapping[str, Any],
|
||||
internal_config: InternalConfig,
|
||||
) -> Iterator[AirbyteMessage]:
|
||||
stream_name = configured_stream.stream.name
|
||||
stream_state = connector_state.get(stream_name, {})
|
||||
@@ -170,31 +181,46 @@ class AbstractSource(Source, ABC):
|
||||
slices = stream_instance.stream_slices(
|
||||
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
|
||||
)
|
||||
total_records_counter = 0
|
||||
for slice in slices:
|
||||
record_counter = 0
|
||||
records = stream_instance.read_records(
|
||||
sync_mode=SyncMode.incremental,
|
||||
stream_slice=slice,
|
||||
stream_state=stream_state,
|
||||
cursor_field=configured_stream.cursor_field or None,
|
||||
)
|
||||
for record_data in records:
|
||||
record_counter += 1
|
||||
for record_counter, record_data in enumerate(records, start=1):
|
||||
yield self._as_airbyte_record(stream_name, record_data)
|
||||
stream_state = stream_instance.get_updated_state(stream_state, record_data)
|
||||
if checkpoint_interval and record_counter % checkpoint_interval == 0:
|
||||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
|
||||
|
||||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
|
||||
total_records_counter += 1
|
||||
# This functionality should ideally live outside of this method
|
||||
# but since state is managed inside this method, we keep track
|
||||
# of it here.
|
||||
if self._limit_reached(internal_config, total_records_counter):
|
||||
# Break from slice loop to save state and exit from _read_incremental function.
|
||||
break
|
||||
|
||||
def _read_full_refresh(self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream) -> Iterator[AirbyteMessage]:
|
||||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
|
||||
if self._limit_reached(internal_config, total_records_counter):
|
||||
return
|
||||
|
||||
def _read_full_refresh(
|
||||
self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig
|
||||
) -> Iterator[AirbyteMessage]:
|
||||
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
|
||||
total_records_counter = 0
|
||||
for slice in slices:
|
||||
records = stream_instance.read_records(
|
||||
stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field
|
||||
)
|
||||
for record in records:
|
||||
yield self._as_airbyte_record(configured_stream.stream.name, record)
|
||||
total_records_counter += 1
|
||||
if self._limit_reached(internal_config, total_records_counter):
|
||||
return
|
||||
|
||||
def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
|
||||
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
|
||||
|
||||
Reference in New Issue
Block a user