302 lines
12 KiB
Python
302 lines
12 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from urllib.parse import parse_qs, urlencode, urlparse
|
|
|
|
import pytest
|
|
from conftest import TEST_CONFIG, get_source
|
|
from freezegun import freeze_time
|
|
|
|
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
|
|
from airbyte_cdk.test.catalog_builder import CatalogBuilder
|
|
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
|
|
from airbyte_cdk.test.state_builder import StateBuilder
|
|
|
|
|
|
BASE = "https://api.twilio.com/2010-04-01"
|
|
|
|
ACCOUNTS_JSON = {
|
|
"accounts": [
|
|
{
|
|
"sid": "AC123",
|
|
"date_created": "2022-01-01T00:00:00Z",
|
|
"subresource_uris": {
|
|
"addresses": "/2010-04-01/Accounts/AC123/Addresses.json",
|
|
"calls": "/2010-04-01/Accounts/AC123/Calls.json",
|
|
"messages": "/2010-04-01/Accounts/AC123/Messages.json",
|
|
"recordings": "/2010-04-01/Accounts/AC123/Recordings.json",
|
|
},
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def read_from_stream(cfg, stream: str, sync_mode, state=None, expecting_exception: bool = False) -> EntrypointOutput:
|
|
catalog = CatalogBuilder().with_stream(stream, sync_mode).build()
|
|
return read(get_source(cfg, state), cfg, catalog, state, expecting_exception)
|
|
|
|
|
|
class TestTwilioStream:
|
|
def test_next_page_token(self, requests_mock):
|
|
accounts_page_1_json = {
|
|
"accounts": [
|
|
{
|
|
"sid": "AC123",
|
|
"date_created": "2022-01-01T00:00:00Z",
|
|
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
|
|
}
|
|
],
|
|
"next_page_uri": "/2010-04-01/Accounts.json?PageSize=1000&Page=2&PageToken=PAAD42931b949c0dedce94b2f93847fdcf95",
|
|
}
|
|
requests_mock.get(f"{BASE}/Accounts.json", json=accounts_page_1_json, status_code=200)
|
|
|
|
accounts_page_2_json = {
|
|
"accounts": [
|
|
{
|
|
"sid": "AC124",
|
|
"date_created": "2022-01-01T00:00:00Z",
|
|
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
|
|
}
|
|
]
|
|
}
|
|
requests_mock.get(
|
|
f"{BASE}/Accounts.json?PageSize=1000&Page=2&PageToken=PAAD42931b949c0dedce94b2f93847fdcf95",
|
|
json=accounts_page_2_json,
|
|
status_code=200,
|
|
)
|
|
|
|
records = read_from_stream(TEST_CONFIG, "accounts", SyncMode.full_refresh).records
|
|
|
|
assert len(records) == 2
|
|
|
|
def test_backoff_time(self, requests_mock, mocker):
|
|
sleep_mock = mocker.patch("time.sleep")
|
|
|
|
requests_mock.register_uri(
|
|
"GET",
|
|
f"{BASE}/Accounts.json",
|
|
[
|
|
{"status_code": 429, "json": {}, "headers": {"retry-after": "5.5"}},
|
|
{"status_code": 200, "json": ACCOUNTS_JSON},
|
|
],
|
|
)
|
|
|
|
records = read_from_stream(TEST_CONFIG, "accounts", SyncMode.full_refresh).records
|
|
|
|
assert len(records) == 1
|
|
assert sleep_mock.called
|
|
sleep_mock.assert_any_call(pytest.approx(6.5))
|
|
|
|
def test_transform_function(self, requests_mock):
|
|
accounts_json = {
|
|
"accounts": [
|
|
{
|
|
"sid": "AC123",
|
|
"date_created": "2022-01-01T00:00:00Z",
|
|
"date_updated": "Fri, 11 Dec 2020 04:28:40 +0000",
|
|
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
|
|
}
|
|
]
|
|
}
|
|
requests_mock.get(f"{BASE}/Accounts.json", json=accounts_json, status_code=200)
|
|
|
|
records = read_from_stream(TEST_CONFIG, "accounts", SyncMode.full_refresh).records
|
|
|
|
assert len(records) == 1
|
|
assert records[0].record.data["date_created"] == "2022-01-01T00:00:00Z"
|
|
assert records[0].record.data["date_updated"] == "2020-12-11T04:28:40Z"
|
|
|
|
|
|
class TestIncrementalTwilioStream:
|
|
@freeze_time("2022-11-16 12:03:11+00:00")
|
|
def test_calls_includes_date_window_params(self, requests_mock):
|
|
requests_mock.get(f"{BASE}/Accounts.json", json=ACCOUNTS_JSON, status_code=200)
|
|
|
|
qs = urlencode({"EndTime>": "2022-11-15", "EndTime<": "2022-11-16", "PageSize": 1000})
|
|
requests_mock.get(
|
|
f"{BASE}/Accounts/AC123/Calls.json?{qs}",
|
|
json={"calls": [{"sid": "CA1", "end_time": "2022-11-15T12:00:00Z"}]},
|
|
status_code=200,
|
|
)
|
|
|
|
records = read_from_stream({**TEST_CONFIG, "start_date": "2022-11-15T00:00:00Z"}, "calls", SyncMode.full_refresh).records
|
|
assert len(records) == 1
|
|
|
|
@freeze_time("2022-11-16 12:03:11+00:00")
|
|
@pytest.mark.parametrize(
|
|
"stream_name,path,lower_key,upper_key,state,windows",
|
|
[
|
|
(
|
|
"messages",
|
|
"/Accounts/AC123/Messages.json",
|
|
"DateSent>",
|
|
"DateSent<",
|
|
{
|
|
"states": [
|
|
{
|
|
"partition": {"subresource_uri": "/2010-04-01/Accounts/AC123/Messages.json"},
|
|
"cursor": {"date_sent": "2022-11-13T12:11:10Z"},
|
|
}
|
|
]
|
|
},
|
|
[
|
|
("2022-11-13 12:11:10Z", "2022-11-16 12:03:11Z"),
|
|
],
|
|
),
|
|
(
|
|
"usage_records",
|
|
"/Accounts/AC123/Usage/Records/Daily.json",
|
|
"StartDate",
|
|
"EndDate",
|
|
{"states": [{"partition": {"account_sid": "AC123"}, "cursor": {"start_date": "2022-11-13"}}]},
|
|
[
|
|
("2022-11-13", "2022-11-16"),
|
|
],
|
|
),
|
|
(
|
|
"recordings",
|
|
"/Accounts/AC123/Recordings.json",
|
|
"DateCreated>",
|
|
"DateCreated<",
|
|
{
|
|
"states": [
|
|
{
|
|
"partition": {"subresource_uri": "/2010-04-01/Accounts/AC123/Recordings.json"},
|
|
"cursor": {"date_created": "2021-11-13 00:00:00Z"},
|
|
}
|
|
]
|
|
},
|
|
[
|
|
("2021-11-13 00:00:00Z", "2022-11-12 23:59:59Z"),
|
|
("2022-11-13 00:00:00Z", "2022-11-16 12:03:11Z"),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
def test_incremental_calls_with_date_ranges(self, stream_name, path, lower_key, upper_key, state, windows, requests_mock):
|
|
def _register_date_window(m, path, body_key, lower_key, upper_key, lower_val, upper_val):
|
|
def _match(req):
|
|
q = parse_qs(urlparse(req.url).query, keep_blank_values=True)
|
|
return q.get(lower_key) == [lower_val] and q.get(upper_key) == [upper_val]
|
|
|
|
# one matcher per window
|
|
return m.get(f"{BASE}{path}", json={body_key: [{}]}, status_code=200, additional_matcher=_match)
|
|
|
|
# Parent
|
|
accounts_matcher = requests_mock.get(f"{BASE}/Accounts.json", json=ACCOUNTS_JSON, status_code=200)
|
|
|
|
# One matcher per expected window (exact query values)
|
|
child_matchers = [_register_date_window(requests_mock, path, stream_name, lower_key, upper_key, lo, hi) for (lo, hi) in windows]
|
|
|
|
state = (
|
|
StateBuilder()
|
|
.with_stream_state(
|
|
stream_name,
|
|
state,
|
|
)
|
|
.build()
|
|
)
|
|
|
|
_ = read_from_stream({**TEST_CONFIG, "start_date": "2000-11-15T00:00:00Z"}, stream_name, SyncMode.incremental, state).records
|
|
|
|
assert accounts_matcher.called, "Accounts endpoint was not called"
|
|
assert all(m.called for m in child_matchers), "Not all date-window URLs were called"
|
|
assert sum(m.call_count for m in child_matchers) == len(windows)
|
|
|
|
|
|
class TestTwilioNestedStream:
|
|
@freeze_time("2022-11-16 12:03:11+00:00")
|
|
def test_message_media_filters_num_media_zero(self, requests_mock):
|
|
ACCOUNTS_JSON = {
|
|
"accounts": [
|
|
{
|
|
"sid": "AC123",
|
|
"date_created": "2022-01-01T00:00:00Z",
|
|
"subresource_uris": {
|
|
"addresses": "/2010-04-01/Accounts/AC123/Addresses.json",
|
|
"calls": "/2010-04-01/Accounts/AC123/Calls.json",
|
|
"messages": "/2010-04-01/Accounts/AC123/Messages.json",
|
|
"recordings": "/2010-04-01/Accounts/AC123/Recordings.json",
|
|
},
|
|
}
|
|
],
|
|
}
|
|
# Parent accounts
|
|
requests_mock.get(f"{BASE}/Accounts.json", json=ACCOUNTS_JSON, status_code=200)
|
|
|
|
# Messages: one with num_media "0" (should be filtered out), one with "1" (should be kept)
|
|
messages_json = {
|
|
"messages": [
|
|
{
|
|
"sid": "SM0",
|
|
"account_sid": "AC123",
|
|
"num_media": "0",
|
|
"date_sent": "2022-11-16T01:00:00Z",
|
|
"subresource_uris": {"media": "/2010-04-01/Accounts/AC123/Messages/SM0/Media.json"},
|
|
},
|
|
{
|
|
"sid": "SM1",
|
|
"account_sid": "AC123",
|
|
"num_media": "1",
|
|
"date_sent": "2022-11-16T01:00:00Z",
|
|
"subresource_uris": {"media": "/2010-04-01/Accounts/AC123/Messages/SM1/Media.json"},
|
|
},
|
|
]
|
|
}
|
|
# Ignore query params (date slice, PageSize, etc.) so one matcher handles all windows.
|
|
requests_mock.get(f"{BASE}/Accounts/AC123/Messages.json", json=messages_json, status_code=200)
|
|
|
|
# Only register the valid media endpoint (SM1). If the stream tries SM0, test will fail (unmatched request).
|
|
media_json = {"media_list": [{"sid": "ME1", "date_created": "2022-11-16T01:05:00Z"}]}
|
|
media_matcher = requests_mock.get(
|
|
f"{BASE}/Accounts/AC123/Messages/SM1/Media.json",
|
|
json=media_json,
|
|
status_code=200,
|
|
)
|
|
|
|
cfg = {**TEST_CONFIG, "start_date": "2022-11-15T00:00:00Z"}
|
|
out = read_from_stream(cfg, "message_media", SyncMode.full_refresh)
|
|
records = out.records
|
|
|
|
# Assert we fetched media only for SM1
|
|
assert media_matcher.called, "Media endpoint for SM1 was not called"
|
|
assert len(records) == 1, f"Expected 1 media record (only from SM1), got {len(records)}"
|
|
|
|
@pytest.mark.parametrize(
|
|
"stream_name, expected_count",
|
|
[
|
|
("addresses", 1),
|
|
("dependent_phone_numbers", 1),
|
|
],
|
|
)
|
|
def test_stream_http_end_to_end(self, stream_name, expected_count, requests_mock):
|
|
# 1) Parent: Accounts (provides the subresource_uris.addresses link)
|
|
accounts_json = {
|
|
"accounts": [
|
|
{
|
|
"sid": "AC123",
|
|
"date_created": "2022-01-01T00:00:00Z",
|
|
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
|
|
}
|
|
]
|
|
}
|
|
requests_mock.get(f"{BASE}/Accounts.json", json=accounts_json, status_code=200)
|
|
|
|
# 2) Child: Addresses (collection key must match the stream name: "addresses")
|
|
addresses_json = {"addresses": [{"sid": "AD1", "account_sid": "AC123"}]}
|
|
requests_mock.get(f"{BASE}/Accounts/AC123/Addresses.json", json=addresses_json, status_code=200)
|
|
|
|
# 3) Grandchild: DependentPhoneNumbers (collection key must be "dependent_phone_numbers")
|
|
if stream_name == "dependent_phone_numbers":
|
|
dpn_json = {"dependent_phone_numbers": [{"sid": "PN1", "account_sid": "AC123"}]}
|
|
requests_mock.get(
|
|
f"{BASE}/Accounts/AC123/Addresses/AD1/DependentPhoneNumbers.json",
|
|
json=dpn_json,
|
|
status_code=200,
|
|
)
|
|
|
|
records = read_from_stream(TEST_CONFIG, stream_name, SyncMode.full_refresh).records
|
|
|
|
assert len(records) == expected_count
|