1
0
mirror of synced 2025-12-20 10:32:35 -05:00
Files
2025-09-03 10:20:07 +03:00

302 lines
12 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from urllib.parse import parse_qs, urlencode, urlparse
import pytest
from conftest import TEST_CONFIG, get_source
from freezegun import freeze_time
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.state_builder import StateBuilder
BASE = "https://api.twilio.com/2010-04-01"
ACCOUNTS_JSON = {
"accounts": [
{
"sid": "AC123",
"date_created": "2022-01-01T00:00:00Z",
"subresource_uris": {
"addresses": "/2010-04-01/Accounts/AC123/Addresses.json",
"calls": "/2010-04-01/Accounts/AC123/Calls.json",
"messages": "/2010-04-01/Accounts/AC123/Messages.json",
"recordings": "/2010-04-01/Accounts/AC123/Recordings.json",
},
}
],
}
def read_from_stream(cfg, stream: str, sync_mode, state=None, expecting_exception: bool = False) -> EntrypointOutput:
catalog = CatalogBuilder().with_stream(stream, sync_mode).build()
return read(get_source(cfg, state), cfg, catalog, state, expecting_exception)
class TestTwilioStream:
def test_next_page_token(self, requests_mock):
accounts_page_1_json = {
"accounts": [
{
"sid": "AC123",
"date_created": "2022-01-01T00:00:00Z",
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
}
],
"next_page_uri": "/2010-04-01/Accounts.json?PageSize=1000&Page=2&PageToken=PAAD42931b949c0dedce94b2f93847fdcf95",
}
requests_mock.get(f"{BASE}/Accounts.json", json=accounts_page_1_json, status_code=200)
accounts_page_2_json = {
"accounts": [
{
"sid": "AC124",
"date_created": "2022-01-01T00:00:00Z",
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
}
]
}
requests_mock.get(
f"{BASE}/Accounts.json?PageSize=1000&Page=2&PageToken=PAAD42931b949c0dedce94b2f93847fdcf95",
json=accounts_page_2_json,
status_code=200,
)
records = read_from_stream(TEST_CONFIG, "accounts", SyncMode.full_refresh).records
assert len(records) == 2
def test_backoff_time(self, requests_mock, mocker):
sleep_mock = mocker.patch("time.sleep")
requests_mock.register_uri(
"GET",
f"{BASE}/Accounts.json",
[
{"status_code": 429, "json": {}, "headers": {"retry-after": "5.5"}},
{"status_code": 200, "json": ACCOUNTS_JSON},
],
)
records = read_from_stream(TEST_CONFIG, "accounts", SyncMode.full_refresh).records
assert len(records) == 1
assert sleep_mock.called
sleep_mock.assert_any_call(pytest.approx(6.5))
def test_transform_function(self, requests_mock):
accounts_json = {
"accounts": [
{
"sid": "AC123",
"date_created": "2022-01-01T00:00:00Z",
"date_updated": "Fri, 11 Dec 2020 04:28:40 +0000",
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
}
]
}
requests_mock.get(f"{BASE}/Accounts.json", json=accounts_json, status_code=200)
records = read_from_stream(TEST_CONFIG, "accounts", SyncMode.full_refresh).records
assert len(records) == 1
assert records[0].record.data["date_created"] == "2022-01-01T00:00:00Z"
assert records[0].record.data["date_updated"] == "2020-12-11T04:28:40Z"
class TestIncrementalTwilioStream:
@freeze_time("2022-11-16 12:03:11+00:00")
def test_calls_includes_date_window_params(self, requests_mock):
requests_mock.get(f"{BASE}/Accounts.json", json=ACCOUNTS_JSON, status_code=200)
qs = urlencode({"EndTime>": "2022-11-15", "EndTime<": "2022-11-16", "PageSize": 1000})
requests_mock.get(
f"{BASE}/Accounts/AC123/Calls.json?{qs}",
json={"calls": [{"sid": "CA1", "end_time": "2022-11-15T12:00:00Z"}]},
status_code=200,
)
records = read_from_stream({**TEST_CONFIG, "start_date": "2022-11-15T00:00:00Z"}, "calls", SyncMode.full_refresh).records
assert len(records) == 1
@freeze_time("2022-11-16 12:03:11+00:00")
@pytest.mark.parametrize(
"stream_name,path,lower_key,upper_key,state,windows",
[
(
"messages",
"/Accounts/AC123/Messages.json",
"DateSent>",
"DateSent<",
{
"states": [
{
"partition": {"subresource_uri": "/2010-04-01/Accounts/AC123/Messages.json"},
"cursor": {"date_sent": "2022-11-13T12:11:10Z"},
}
]
},
[
("2022-11-13 12:11:10Z", "2022-11-16 12:03:11Z"),
],
),
(
"usage_records",
"/Accounts/AC123/Usage/Records/Daily.json",
"StartDate",
"EndDate",
{"states": [{"partition": {"account_sid": "AC123"}, "cursor": {"start_date": "2022-11-13"}}]},
[
("2022-11-13", "2022-11-16"),
],
),
(
"recordings",
"/Accounts/AC123/Recordings.json",
"DateCreated>",
"DateCreated<",
{
"states": [
{
"partition": {"subresource_uri": "/2010-04-01/Accounts/AC123/Recordings.json"},
"cursor": {"date_created": "2021-11-13 00:00:00Z"},
}
]
},
[
("2021-11-13 00:00:00Z", "2022-11-12 23:59:59Z"),
("2022-11-13 00:00:00Z", "2022-11-16 12:03:11Z"),
],
),
],
)
def test_incremental_calls_with_date_ranges(self, stream_name, path, lower_key, upper_key, state, windows, requests_mock):
def _register_date_window(m, path, body_key, lower_key, upper_key, lower_val, upper_val):
def _match(req):
q = parse_qs(urlparse(req.url).query, keep_blank_values=True)
return q.get(lower_key) == [lower_val] and q.get(upper_key) == [upper_val]
# one matcher per window
return m.get(f"{BASE}{path}", json={body_key: [{}]}, status_code=200, additional_matcher=_match)
# Parent
accounts_matcher = requests_mock.get(f"{BASE}/Accounts.json", json=ACCOUNTS_JSON, status_code=200)
# One matcher per expected window (exact query values)
child_matchers = [_register_date_window(requests_mock, path, stream_name, lower_key, upper_key, lo, hi) for (lo, hi) in windows]
state = (
StateBuilder()
.with_stream_state(
stream_name,
state,
)
.build()
)
_ = read_from_stream({**TEST_CONFIG, "start_date": "2000-11-15T00:00:00Z"}, stream_name, SyncMode.incremental, state).records
assert accounts_matcher.called, "Accounts endpoint was not called"
assert all(m.called for m in child_matchers), "Not all date-window URLs were called"
assert sum(m.call_count for m in child_matchers) == len(windows)
class TestTwilioNestedStream:
@freeze_time("2022-11-16 12:03:11+00:00")
def test_message_media_filters_num_media_zero(self, requests_mock):
ACCOUNTS_JSON = {
"accounts": [
{
"sid": "AC123",
"date_created": "2022-01-01T00:00:00Z",
"subresource_uris": {
"addresses": "/2010-04-01/Accounts/AC123/Addresses.json",
"calls": "/2010-04-01/Accounts/AC123/Calls.json",
"messages": "/2010-04-01/Accounts/AC123/Messages.json",
"recordings": "/2010-04-01/Accounts/AC123/Recordings.json",
},
}
],
}
# Parent accounts
requests_mock.get(f"{BASE}/Accounts.json", json=ACCOUNTS_JSON, status_code=200)
# Messages: one with num_media "0" (should be filtered out), one with "1" (should be kept)
messages_json = {
"messages": [
{
"sid": "SM0",
"account_sid": "AC123",
"num_media": "0",
"date_sent": "2022-11-16T01:00:00Z",
"subresource_uris": {"media": "/2010-04-01/Accounts/AC123/Messages/SM0/Media.json"},
},
{
"sid": "SM1",
"account_sid": "AC123",
"num_media": "1",
"date_sent": "2022-11-16T01:00:00Z",
"subresource_uris": {"media": "/2010-04-01/Accounts/AC123/Messages/SM1/Media.json"},
},
]
}
# Ignore query params (date slice, PageSize, etc.) so one matcher handles all windows.
requests_mock.get(f"{BASE}/Accounts/AC123/Messages.json", json=messages_json, status_code=200)
# Only register the valid media endpoint (SM1). If the stream tries SM0, test will fail (unmatched request).
media_json = {"media_list": [{"sid": "ME1", "date_created": "2022-11-16T01:05:00Z"}]}
media_matcher = requests_mock.get(
f"{BASE}/Accounts/AC123/Messages/SM1/Media.json",
json=media_json,
status_code=200,
)
cfg = {**TEST_CONFIG, "start_date": "2022-11-15T00:00:00Z"}
out = read_from_stream(cfg, "message_media", SyncMode.full_refresh)
records = out.records
# Assert we fetched media only for SM1
assert media_matcher.called, "Media endpoint for SM1 was not called"
assert len(records) == 1, f"Expected 1 media record (only from SM1), got {len(records)}"
@pytest.mark.parametrize(
"stream_name, expected_count",
[
("addresses", 1),
("dependent_phone_numbers", 1),
],
)
def test_stream_http_end_to_end(self, stream_name, expected_count, requests_mock):
# 1) Parent: Accounts (provides the subresource_uris.addresses link)
accounts_json = {
"accounts": [
{
"sid": "AC123",
"date_created": "2022-01-01T00:00:00Z",
"subresource_uris": {"addresses": "/2010-04-01/Accounts/AC123/Addresses.json"},
}
]
}
requests_mock.get(f"{BASE}/Accounts.json", json=accounts_json, status_code=200)
# 2) Child: Addresses (collection key must match the stream name: "addresses")
addresses_json = {"addresses": [{"sid": "AD1", "account_sid": "AC123"}]}
requests_mock.get(f"{BASE}/Accounts/AC123/Addresses.json", json=addresses_json, status_code=200)
# 3) Grandchild: DependentPhoneNumbers (collection key must be "dependent_phone_numbers")
if stream_name == "dependent_phone_numbers":
dpn_json = {"dependent_phone_numbers": [{"sid": "PN1", "account_sid": "AC123"}]}
requests_mock.get(
f"{BASE}/Accounts/AC123/Addresses/AD1/DependentPhoneNumbers.json",
json=dpn_json,
status_code=200,
)
records = read_from_stream(TEST_CONFIG, stream_name, SyncMode.full_refresh).records
assert len(records) == expected_count