* checkout from alex/cac * doc * doc * remove broken test * rename * rename file * delete unused file * rename * abstract property * isort * update state * Update comment * remove incremental mixin * delete comment * update comments * update comments * remove no_state * rename package * pass parameters through kwargs * update interface to pass source in interface * update interface to pass source in interface * rename to stream_slicer * Low code connectors: string interpolation with jinja (#12852) * checkout from alex/cac * Add missing tests * Add missing files * missing file * rename * jinja dependency * Add comment * comment * comment * Revert "delete unused file" This reverts commit758e939367. * delete unused field * delete unused field * rename * pass kwargs directly * isort * Revert "isort" This reverts commit4a79223944. * format * decoder * better error handling * remove nostate * isort * delete dead code * Update mapping type to [str, Any] * add comment * Add comment * pass parameters through kwargs * move test to right module * Add missing test * Use authbase instead of deprecated class * leverage generator * rename to declarative * rename the classes too
46 lines
1.5 KiB
Python
46 lines
1.5 KiB
Python
#
|
|
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from unittest.mock import MagicMock
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
|
|
|
|
|
|
def test():
|
|
name = "stream"
|
|
primary_key = "pk"
|
|
cursor_field = ["created_at"]
|
|
|
|
schema_loader = MagicMock()
|
|
json_schema = {"name": {"type": "string"}}
|
|
schema_loader.get_json_schema.return_value = json_schema
|
|
|
|
state = MagicMock()
|
|
records = [{"pk": 1234, "field": "value"}, {"pk": 4567, "field": "different_value"}]
|
|
stream_slices = [{"date": "2021-01-01"}, {"date": "2021-01-02"}, {"date": "2021-01-03"}]
|
|
checkpoint_interval = 1000
|
|
|
|
retriever = MagicMock()
|
|
retriever.get_state.return_value = state
|
|
retriever.read_records.return_value = records
|
|
retriever.stream_slices.return_value = stream_slices
|
|
retriever.state_checkpoint_interval = checkpoint_interval
|
|
|
|
stream = DeclarativeStream(
|
|
name=name,
|
|
primary_key=primary_key,
|
|
cursor_field=cursor_field,
|
|
schema_loader=schema_loader,
|
|
retriever=retriever,
|
|
)
|
|
|
|
assert stream.name == name
|
|
assert stream.get_json_schema() == json_schema
|
|
assert stream.state == state
|
|
assert stream.read_records(SyncMode.full_refresh, cursor_field, None, None) == records
|
|
assert stream.primary_key == primary_key
|
|
assert stream.cursor_field == cursor_field
|
|
assert stream.stream_slices(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_state=None) == stream_slices
|