171 lines
7.3 KiB
Python
171 lines
7.3 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from http import HTTPStatus
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
import requests
|
|
import responses
|
|
from source_convex.source import ConvexStream
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
|
|
|
|
@pytest.fixture
|
|
def patch_base_class(mocker):
|
|
# Mock abstract methods to enable instantiating abstract class
|
|
mocker.patch.object(ConvexStream, "primary_key", "test_primary_key")
|
|
mocker.patch.object(ConvexStream, "__abstractmethods__", set())
|
|
|
|
|
|
def test_request_params(patch_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
|
|
expected_params = {"tableName": "messages", "format": "json"}
|
|
assert stream.request_params(**inputs) == expected_params
|
|
stream._snapshot_cursor_value = 1234
|
|
expected_params = {"tableName": "messages", "format": "json", "cursor": 1234}
|
|
assert stream.request_params(**inputs) == expected_params
|
|
stream._snapshot_has_more = False
|
|
stream._delta_cursor_value = 2345
|
|
expected_params = {"tableName": "messages", "format": "json", "cursor": 2345}
|
|
assert stream.request_params(**inputs) == expected_params
|
|
|
|
|
|
def test_next_page_token(patch_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
resp = MagicMock()
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 5000, "hasMore": True}
|
|
resp.status_code = 200
|
|
stream.parse_response(resp, {})
|
|
assert stream.next_page_token(resp) == {
|
|
"snapshot_cursor": 1234,
|
|
"snapshot_has_more": True,
|
|
"delta_cursor": 5000,
|
|
}
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 1235, "snapshot": 5000, "hasMore": False}
|
|
stream.parse_response(resp, {})
|
|
assert stream.next_page_token(resp) == {
|
|
"snapshot_cursor": 1235,
|
|
"snapshot_has_more": False,
|
|
"delta_cursor": 5000,
|
|
}
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 6000, "hasMore": True}
|
|
stream.parse_response(resp, {})
|
|
assert stream.next_page_token(resp) == {
|
|
"snapshot_cursor": 1235,
|
|
"snapshot_has_more": False,
|
|
"delta_cursor": 6000,
|
|
}
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 7000, "hasMore": False}
|
|
stream.parse_response(resp, {})
|
|
assert stream.next_page_token(resp) is None
|
|
assert stream.state == {"snapshot_cursor": 1235, "snapshot_has_more": False, "delta_cursor": 7000}
|
|
|
|
|
|
@responses.activate
|
|
def test_read_records_full_refresh(patch_base_class):
|
|
stream = ConvexStream("http://mocked_base_url:8080", "accesskey", "json", "messages", None)
|
|
snapshot0_resp = {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 5000, "hasMore": True}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json",
|
|
json=snapshot0_resp,
|
|
)
|
|
snapshot1_resp = {"values": [{"_id": "an_id", "field": "b", "_ts": 100}], "cursor": 2345, "snapshot": 5000, "hasMore": True}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json&cursor=1234&snapshot=5000",
|
|
json=snapshot1_resp,
|
|
)
|
|
snapshot2_resp = {"values": [{"_id": "a_id", "field": "x", "_ts": 300}], "cursor": 3456, "snapshot": 5000, "hasMore": False}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json&cursor=2345&snapshot=5000",
|
|
json=snapshot2_resp,
|
|
)
|
|
records = list(stream.read_records(SyncMode.full_refresh))
|
|
assert len(records) == 3
|
|
assert [record["field"] for record in records] == ["f", "b", "x"]
|
|
assert stream.state == {"delta_cursor": 5000, "snapshot_cursor": 3456, "snapshot_has_more": False}
|
|
|
|
|
|
@responses.activate
|
|
def test_read_records_incremental(patch_base_class):
|
|
stream = ConvexStream("http://mocked_base_url:8080", "accesskey", "json", "messages", None)
|
|
snapshot0_resp = {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 5000, "hasMore": True}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json",
|
|
json=snapshot0_resp,
|
|
)
|
|
snapshot1_resp = {"values": [{"_id": "an_id", "field": "b", "_ts": 100}], "cursor": 2345, "snapshot": 5000, "hasMore": False}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json&cursor=1234&snapshot=5000",
|
|
json=snapshot1_resp,
|
|
)
|
|
delta0_resp = {"values": [{"_id": "a_id", "field": "x", "_ts": 300}], "cursor": 6000, "hasMore": True}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/document_deltas?tableName=messages&format=json&cursor=5000",
|
|
json=delta0_resp,
|
|
)
|
|
delta1_resp = {"values": [{"_id": "a_id", "field": "x", "_ts": 400}], "cursor": 7000, "hasMore": False}
|
|
responses.add(
|
|
responses.GET,
|
|
"http://mocked_base_url:8080/api/document_deltas?tableName=messages&format=json&cursor=6000",
|
|
json=delta1_resp,
|
|
)
|
|
records = list(stream.read_records(SyncMode.incremental))
|
|
assert len(records) == 4
|
|
assert [record["field"] for record in records] == ["f", "b", "x", "x"]
|
|
assert stream.state == {"delta_cursor": 7000, "snapshot_cursor": 2345, "snapshot_has_more": False}
|
|
|
|
|
|
def test_parse_response(patch_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
resp = MagicMock()
|
|
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1234}], "cursor": 1234, "snapshot": 2000, "hasMore": True}
|
|
resp.status_code = 200
|
|
inputs = {"response": resp, "stream_state": {}}
|
|
expected_parsed_objects = [{"_id": "my_id", "field": "f", "_ts": 1234}]
|
|
assert stream.parse_response(**inputs) == expected_parsed_objects
|
|
|
|
|
|
def test_request_headers(patch_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
|
|
assert stream.request_headers(**inputs) == {"Convex-Client": "airbyte-export-0.4.0"}
|
|
|
|
|
|
def test_http_method(patch_base_class):
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
expected_method = "GET"
|
|
assert stream.http_method == expected_method
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("http_status", "should_retry"),
|
|
[
|
|
(HTTPStatus.OK, False),
|
|
(HTTPStatus.BAD_REQUEST, False),
|
|
(HTTPStatus.TOO_MANY_REQUESTS, True),
|
|
(HTTPStatus.INTERNAL_SERVER_ERROR, True),
|
|
],
|
|
)
|
|
def test_should_retry(patch_base_class, http_status, should_retry):
|
|
response_mock = MagicMock()
|
|
response_mock.status_code = http_status
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
assert stream.should_retry(response_mock) == should_retry
|
|
|
|
|
|
def test_backoff_time(patch_base_class):
|
|
response_mock = MagicMock()
|
|
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
|
|
expected_backoff_time = None
|
|
assert stream.backoff_time(response_mock) == expected_backoff_time
|