1
0
mirror of synced 2025-12-26 14:02:10 -05:00

Implement basic ExchangeRate API using Santa. (#2942)

Take Santa for a test drive.

Slight modification to Santa framework.
This commit is contained in:
Davin Chia
2021-04-21 17:09:39 +08:00
committed by GitHub
parent c0ac11d010
commit 3d2d20b87e
14 changed files with 527 additions and 7 deletions

View File

@@ -139,13 +139,16 @@ class AbstractSource(Source, ABC):
logger.info(f"Setting state of {stream_name} stream to {stream_state.get(stream_name)}")
checkpoint_interval = stream_instance.state_checkpoint_interval
batches = stream_instance.stream_slices(
slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
)
for batch in batches:
for slice in slices:
record_counter = 0
records = stream_instance.read_records(
sync_mode=SyncMode.incremental, batch=batch, stream_state=stream_state, cursor_field=configured_stream.cursor_field or None
sync_mode=SyncMode.incremental,
stream_slice=slice,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
for record_data in records:
record_counter += 1
@@ -158,8 +161,8 @@ class AbstractSource(Source, ABC):
def _read_full_refresh(self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream) -> Iterator[AirbyteMessage]:
args = {"sync_mode": SyncMode.full_refresh, "cursor_field": configured_stream.cursor_field}
for batch in stream_instance.stream_slices(**args):
for record in stream_instance.read_records(batch=batch, **args):
for slices in stream_instance.stream_slices(**args):
for record in stream_instance.read_records(stream_slice=slices, **args):
yield self._as_airbyte_record(configured_stream.stream.name, record)
def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):