90 lines
3.4 KiB
Python
90 lines
3.4 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
from unittest.mock import MagicMock
|
|
|
|
from pytest import fixture
|
|
from source_convex.source import ConvexStream
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
|
|
|
|
@fixture
|
|
def patch_incremental_base_class(mocker):
|
|
# Mock abstract methods to enable instantiating abstract class
|
|
mocker.patch.object(ConvexStream, "path", "v0/example_endpoint")
|
|
mocker.patch.object(ConvexStream, "primary_key", "test_primary_key")
|
|
mocker.patch.object(ConvexStream, "__abstractmethods__", set())
|
|
|
|
|
|
def test_cursor_field(patch_incremental_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
expected_cursor_field = "_ts"
|
|
assert stream.cursor_field == expected_cursor_field
|
|
|
|
|
|
def test_get_updated_state(patch_incremental_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
resp = MagicMock()
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 3000, "hasMore": True}
|
|
resp.status_code = 200
|
|
stream.parse_response(resp, {})
|
|
stream.next_page_token(resp)
|
|
assert stream.get_updated_state(None, None) == {
|
|
"snapshot_cursor": 1234,
|
|
"snapshot_has_more": True,
|
|
"delta_cursor": 3000,
|
|
}
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 1235, "snapshot": 3000, "hasMore": False}
|
|
stream.parse_response(resp, {})
|
|
stream.next_page_token(resp)
|
|
assert stream.get_updated_state(None, None) == {
|
|
"snapshot_cursor": 1235,
|
|
"snapshot_has_more": False,
|
|
"delta_cursor": 3000,
|
|
}
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 8000, "hasMore": True}
|
|
stream.parse_response(resp, {})
|
|
stream.next_page_token(resp)
|
|
assert stream.get_updated_state(None, None) == {
|
|
"snapshot_cursor": 1235,
|
|
"snapshot_has_more": False,
|
|
"delta_cursor": 8000,
|
|
}
|
|
assert stream._delta_has_more is True
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 9000, "hasMore": False}
|
|
stream.parse_response(resp, {})
|
|
stream.next_page_token(resp)
|
|
assert stream.get_updated_state(None, None) == {
|
|
"snapshot_cursor": 1235,
|
|
"snapshot_has_more": False,
|
|
"delta_cursor": 9000,
|
|
}
|
|
assert stream._delta_has_more is False
|
|
|
|
|
|
def test_stream_slices(patch_incremental_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}}
|
|
expected_stream_slice = [None]
|
|
assert stream.stream_slices(**inputs) == expected_stream_slice
|
|
|
|
|
|
def test_supports_incremental(patch_incremental_base_class, mocker):
|
|
mocker.patch.object(ConvexStream, "cursor_field", "dummy_field")
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
assert stream.supports_incremental
|
|
|
|
|
|
def test_source_defined_cursor(patch_incremental_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
assert stream.source_defined_cursor
|
|
|
|
|
|
def test_stream_checkpoint_interval(patch_incremental_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
expected_checkpoint_interval = 128
|
|
assert stream.state_checkpoint_interval == expected_checkpoint_interval
|