From 02e4bd07f7fd80ce07bdf522adabcea1e7687f67 Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Thu, 22 Jun 2023 12:14:07 -0400 Subject: [PATCH] [26989] Add request filter for cloud and integration test fixtures for e2e sync testing (#27534) * add the request filters and integration test fixtures * pr feedback and some tweaks to the testing framework * optimize the cache for more hits * formatting * remove cache --- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 67 ++++++- .../test_connector_builder_handler.py | 176 ++++++++++++++++++ .../unit_tests/sources/fixtures/__init__.py | 3 + .../sources/fixtures/source_test_fixture.py | 147 +++++++++++++++ .../sources/test_integration_source.py | 71 +++++++ .../python/unit_tests/test_entrypoint.py | 42 ++++- 6 files changed, 503 insertions(+), 3 deletions(-) create mode 100644 airbyte-cdk/python/unit_tests/sources/fixtures/__init__.py create mode 100644 airbyte-cdk/python/unit_tests/sources/fixtures/source_test_fixture.py create mode 100644 airbyte-cdk/python/unit_tests/sources/test_integration_source.py diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 4a16b534e23..b914aacbbe9 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -2,14 +2,17 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - import argparse import importlib +import ipaddress import logging import os.path +import socket import sys import tempfile +from functools import wraps from typing import Any, Iterable, List, Mapping +from urllib.parse import urlparse from airbyte_cdk.connector import TConfig from airbyte_cdk.exception_handler import init_uncaught_exception_handler @@ -21,13 +24,24 @@ from airbyte_cdk.sources.source import TCatalog, TState from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from requests import Session logger = init_logger("airbyte") +VALID_URL_SCHEMES = ["https"] +CLOUD_DEPLOYMENT_MODE = "cloud" + class AirbyteEntrypoint(object): def __init__(self, source: Source): init_uncaught_exception_handler(logger) + + # DEPLOYMENT_MODE is read when instantiating the entrypoint because it is the common path shared by syncs and connector + # builder test requests + deployment_mode = os.environ.get("DEPLOYMENT_MODE", "") + if deployment_mode.casefold() == CLOUD_DEPLOYMENT_MODE: + _init_internal_request_filter() + self.source = source self.logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}") @@ -172,6 +186,57 @@ def launch(source: Source, args: List[str]): print(message) +def _init_internal_request_filter(): + """ + Wraps the Python requests library to prevent sending requests to internal URL endpoints. + """ + wrapped_fn = Session.send + + @wraps(wrapped_fn) + def filtered_send(self, request, **kwargs): + parsed_url = urlparse(request.url) + + if parsed_url.scheme not in VALID_URL_SCHEMES: + raise ValueError( + "Invalid Protocol Scheme: The endpoint that data is being requested from is using an invalid or insecure " + + f"protocol {parsed_url.scheme}. Valid protocol schemes: {','.join(VALID_URL_SCHEMES)}" + ) + + if not parsed_url.hostname: + raise ValueError("Invalid URL specified: The endpoint that data is being requested from is not a valid URL") + + try: + is_private = _is_private_url(parsed_url.hostname, parsed_url.port) + if is_private: + raise ValueError( + "Invalid URL endpoint: The endpoint that data is being requested from belongs to a private network. Source " + + "connectors only support requesting data from public API endpoints." + ) + except socket.gaierror: + # This is a special case where the developer specifies an IP address string that is not formatted correctly like trailing + # whitespace which will fail the socket IP lookup. This only happens when using IP addresses and not text hostnames. + raise ValueError(f"Invalid hostname or IP address '{parsed_url.hostname}' specified.") + + return wrapped_fn(self, request, **kwargs) + + Session.send = filtered_send + + +def _is_private_url(hostname: str, port: int) -> bool: + """ + Helper method that checks if any of the IP addresses associated with a hostname belong to a private network. + """ + address_info_entries = socket.getaddrinfo(hostname, port) + for entry in address_info_entries: + # getaddrinfo() returns entries in the form of a 5-tuple where the IP is stored as the sockaddr. For IPv4 this + # is a 2-tuple and for IPv6 it is a 4-tuple, but the address is always the first value of the tuple at 0. + # See https://docs.python.org/3/library/socket.html#socket.getaddrinfo for more details. + ip_address = entry[4][0] + if ipaddress.ip_address(ip_address).is_private: + return True + return False + + def main(): impl_module = os.environ.get("AIRBYTE_IMPL_MODULE", Source.__module__) impl_class = os.environ.get("AIRBYTE_IMPL_PATH", Source.__name__) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 376571e02a2..b694bebb81d 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -6,6 +6,7 @@ import copy import dataclasses import json import logging +import os from unittest import mock from unittest.mock import patch @@ -96,6 +97,54 @@ MANIFEST = { } } +OAUTH_MANIFEST = { + "version": "0.30.3", + "definitions": { + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": _page_size, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size}, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id" + }, + "" + "requester": { + "path": "/v3/marketing/lists", + "authenticator": { + "type": "OAuthAuthenticator", + "api_token": "{{ config.apikey }}" + }, + "request_parameters": {"a_param": "10"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$parameters": _stream_options, + "retriever": "#/definitions/retriever", + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [], + "properties": {}, + "additionalProperties": True + }, + "type": "Spec" + } +} + RESOLVE_MANIFEST_CONFIG = { "__injected_declarative_manifest": MANIFEST, "__command": "resolve_manifest", @@ -137,6 +186,14 @@ CONFIGURED_CATALOG = { ] } +MOCK_RESPONSE = { + "result": [ + {"id": 1, "name": "Nora Moon", "position": "director"}, + {"id": 2, "name": "Hae Sung Jung", "position": "cinematographer"}, + {"id": 3, "name": "Arthur Zenneranski", "position": "composer"}, + ] +} + @pytest.fixture def valid_resolve_manifest_config_file(tmp_path): @@ -175,6 +232,19 @@ def invalid_config_file(tmp_path): return config_file +def _mocked_send(self, request, **kwargs) -> requests.Response: + """ + Mocks the outbound send operation to provide faster and more reliable responses compared to actual API requests + """ + response = requests.Response() + response.request = request + response.status_code = 200 + response.headers = {"header": "value"} + response_body = MOCK_RESPONSE + response._content = json.dumps(response_body).encode("utf-8") + return response + + def test_handle_resolve_manifest(valid_resolve_manifest_config_file, dummy_catalog): with mock.patch.object(connector_builder.main, "handle_connector_builder_request") as patched_handle: handle_request(["read", "--config", str(valid_resolve_manifest_config_file), "--catalog", str(dummy_catalog)]) @@ -702,3 +772,109 @@ def test_read_source_single_page_single_slice(mock_http_stream): streams = source.streams(config) for s in streams: assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) + + +@pytest.mark.parametrize( + "deployment_mode, url_base, expected_error", + [ + pytest.param("CLOUD", "https://airbyte.com/api/v1/characters", None, id="test_cloud_read_with_public_endpoint"), + pytest.param("CLOUD", "https://10.0.27.27", "ValueError", id="test_cloud_read_with_private_endpoint"), + pytest.param("CLOUD", "https://localhost:80/api/v1/cast", "ValueError", id="test_cloud_read_with_localhost"), + pytest.param("CLOUD", "http://unsecured.protocol/api/v1", "ValueError", id="test_cloud_read_with_unsecured_endpoint"), + pytest.param("OSS", "https://airbyte.com/api/v1/", None, id="test_oss_read_with_public_endpoint"), + pytest.param("OSS", "https://10.0.27.27/api/v1/", None, id="test_oss_read_with_private_endpoint"), + ] +) +@patch.object(requests.Session, "send", _mocked_send) +def test_handle_read_external_requests(deployment_mode, url_base, expected_error): + """ + This test acts like an integration test for the connector builder when it receives Test Read requests. + + The scenario being tested is whether requests should be denied if they are done on an unsecure channel or are made to internal + endpoints when running on Cloud or OSS deployments + """ + + limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1) + + catalog = ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name=_stream_name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh]), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ) + ]) + + test_manifest = MANIFEST + test_manifest["streams"][0]["$parameters"]["url_base"] = url_base + config = {"__injected_declarative_manifest": test_manifest} + + source = create_source(config, limits) + + with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): + output_data = read_stream(source, config, catalog, limits).record.data + if expected_error: + assert len(output_data["logs"]) > 0, "Expected at least one log message with the expected error" + error_message = output_data["logs"][0] + assert error_message["level"] == "ERROR" + assert expected_error in error_message["message"] + else: + page_records = output_data["slices"][0]["pages"][0] + assert len(page_records) == len(MOCK_RESPONSE["result"]) + + +@pytest.mark.parametrize( + "deployment_mode, token_url, expected_error", + [ + pytest.param("CLOUD", "https://airbyte.com/tokens/bearer", None, id="test_cloud_read_with_public_endpoint"), + pytest.param("CLOUD", "https://10.0.27.27/tokens/bearer", "ValueError", id="test_cloud_read_with_private_endpoint"), + pytest.param("CLOUD", "http://unsecured.protocol/tokens/bearer", "ValueError", id="test_cloud_read_with_unsecured_endpoint"), + pytest.param("OSS", "https://airbyte.com/tokens/bearer", None, id="test_oss_read_with_public_endpoint"), + pytest.param("OSS", "https://10.0.27.27/tokens/bearer", None, id="test_oss_read_with_private_endpoint"), + ] +) +@patch.object(requests.Session, "send", _mocked_send) +def test_handle_read_external_oauth_request(deployment_mode, token_url, expected_error): + """ + This test acts like an integration test for the connector builder when it receives Test Read requests. + + The scenario being tested is whether requests should be denied if they are done on an unsecure channel or are made to internal + endpoints when running on Cloud or OSS deployments + """ + + limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1) + + catalog = ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name=_stream_name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh]), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ) + ]) + + oauth_authenticator_config: dict[str, str] = { + "type": "OAuthAuthenticator", + "token_refresh_endpoint": token_url, + "client_id": "greta", + "client_secret": "teo", + "refresh_token": "john", + } + + test_manifest = MANIFEST + test_manifest["definitions"]["retriever"]["requester"]["authenticator"] = oauth_authenticator_config + config = {"__injected_declarative_manifest": test_manifest} + + source = create_source(config, limits) + + with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): + output_data = read_stream(source, config, catalog, limits).record.data + if expected_error: + assert len(output_data["logs"]) > 0, "Expected at least one log message with the expected error" + error_message = output_data["logs"][0] + assert error_message["level"] == "ERROR" + assert expected_error in error_message["message"] diff --git a/airbyte-cdk/python/unit_tests/sources/fixtures/__init__.py b/airbyte-cdk/python/unit_tests/sources/fixtures/__init__.py new file mode 100644 index 00000000000..46b7376756e --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/fixtures/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/unit_tests/sources/fixtures/source_test_fixture.py b/airbyte-cdk/python/unit_tests/sources/fixtures/source_test_fixture.py new file mode 100644 index 00000000000..101b5dced98 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/fixtures/source_test_fixture.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import json +import logging +from abc import ABC +from typing import Any, Iterable, List, Mapping, Optional, Tuple, Union + +import requests +from airbyte_cdk.models import ( + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + ConnectorSpecification, + DestinationSyncMode, + SyncMode, +) +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator +from requests.auth import AuthBase + + +class SourceTestFixture(AbstractSource): + """ + This is a concrete implementation of a Source connector that provides implementations of all the methods needed to run sync + operations. For simplicity, it also overrides functions that read from files in favor of returning the data directly avoiding + the need to load static files (ex. spec.yaml, config.json, configured_catalog.json) into the unit-test package. + """ + def __init__(self, streams: Optional[List[Stream]] = None, authenticator: Optional[AuthBase] = None): + self._streams = streams + self._authenticator = authenticator + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: + return ConnectorSpecification(connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Test Fixture Spec", + "type": "object", + "required": ["api_token"], + "properties": { + "api_token": { + "type": "string", + "title": "API token", + "description": "The token used to authenticate requests to the API.", + "airbyte_secret": True + } + } + }) + + def read_config(self, config_path: str) -> Mapping[str, Any]: + return { + "api_token": "just_some_token" + } + + @classmethod + def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="http_test_stream", + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + default_cursor_field=["updated_at"], + source_defined_cursor=True, + source_defined_primary_key=[["id"]] + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ]) + + def check_connection(self, *args, **kwargs) -> Tuple[bool, Optional[Any]]: + return True, "" + + def streams(self, *args, **kwargs) -> List[Stream]: + return [HttpTestStream(authenticator=self._authenticator)] + + +class HttpTestStream(HttpStream, ABC): + url_base = "https://airbyte.com/api/v1/" + + def supports_incremental(self): + return True + + @property + def availability_strategy(self): + return None + + def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: + return "id" + + def path( + self, + *, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return "cast" + + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + body = response.json() or {} + return body["records"] + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def get_json_schema(self) -> Mapping[str, Any]: + return {} + + +def fixture_mock_send(self, request, **kwargs) -> requests.Response: + """ + Helper method that can be used by a test to patch the Session.send() function and mock the outbound send operation to provide + faster and more reliable responses compared to actual API requests + """ + response = requests.Response() + response.request = request + response.status_code = 200 + response.headers = {"header": "value"} + response_body = {"records": [ + {"id": 1, "name": "Celine Song", "position": "director"}, + {"id": 2, "name": "Shabier Kirchner", "position": "cinematographer"}, + {"id": 3, "name": "Christopher Bear", "position": "composer"}, + {"id": 4, "name": "Daniel Rossen", "position": "composer"} + ]} + response._content = json.dumps(response_body).encode("utf-8") + return response + + +class SourceFixtureOauthAuthenticator(Oauth2Authenticator): + """ + Test OAuth authenticator that only overrides the request and response aspect of the authenticator flow + """ + def refresh_access_token(self) -> Tuple[str, int]: + response = requests.request(method="POST", url=self.get_token_refresh_endpoint(), params={}) + response.raise_for_status() + return "some_access_token", 1800 # Mock oauth response values to be used during the data retrieval step diff --git a/airbyte-cdk/python/unit_tests/sources/test_integration_source.py b/airbyte-cdk/python/unit_tests/sources/test_integration_source.py new file mode 100644 index 00000000000..01f58b5adec --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/test_integration_source.py @@ -0,0 +1,71 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import os +from unittest import mock +from unittest.mock import patch + +import pytest +import requests +from airbyte_cdk.entrypoint import launch +from unit_tests.sources.fixtures.source_test_fixture import ( + HttpTestStream, + SourceFixtureOauthAuthenticator, + SourceTestFixture, + fixture_mock_send, +) + + +@pytest.mark.parametrize( + "deployment_mode, url_base, expected_records, expected_error", + [ + pytest.param("CLOUD", "https://airbyte.com/api/v1/", [], None, id="test_cloud_read_with_public_endpoint"), + pytest.param("CLOUD", "http://unsecured.com/api/v1/", [], ValueError, id="test_cloud_read_with_unsecured_url"), + pytest.param("CLOUD", "https://172.20.105.99/api/v1/", [], ValueError, id="test_cloud_read_with_private_endpoint"), + pytest.param("CLOUD", "https://localhost:80/api/v1/", [], ValueError, id="test_cloud_read_with_localhost"), + pytest.param("OSS", "https://airbyte.com/api/v1/", [], None, id="test_oss_read_with_public_endpoint"), + pytest.param("OSS", "https://172.20.105.99/api/v1/", [], None, id="test_oss_read_with_private_endpoint"), + ] +) +@patch.object(requests.Session, "send", fixture_mock_send) +def test_external_request_source(capsys, deployment_mode, url_base, expected_records, expected_error): + source = SourceTestFixture() + + with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): # clear=True clears the existing os.environ dict + with mock.patch.object(HttpTestStream, 'url_base', url_base): + args = ['read', '--config', 'config.json', '--catalog', 'configured_catalog.json'] + if expected_error: + with pytest.raises(expected_error): + launch(source, args) + else: + launch(source, args) + + +@pytest.mark.parametrize( + "deployment_mode, token_refresh_url, expected_records, expected_error", + [ + pytest.param("CLOUD", "https://airbyte.com/api/v1/", [], None, id="test_cloud_read_with_public_endpoint"), + pytest.param("CLOUD", "http://unsecured.com/api/v1/", [], ValueError, id="test_cloud_read_with_unsecured_url"), + pytest.param("CLOUD", "https://172.20.105.99/api/v1/", [], ValueError, id="test_cloud_read_with_private_endpoint"), + pytest.param("OSS", "https://airbyte.com/api/v1/", [], None, id="test_oss_read_with_public_endpoint"), + pytest.param("OSS", "https://172.20.105.99/api/v1/", [], None, id="test_oss_read_with_private_endpoint"), + ] +) +@patch.object(requests.Session, "send", fixture_mock_send) +def test_external_oauth_request_source(deployment_mode, token_refresh_url, expected_records, expected_error): + oauth_authenticator = SourceFixtureOauthAuthenticator( + client_id="nora", + client_secret="hae_sung", + refresh_token="arthur", + token_refresh_endpoint=token_refresh_url + ) + source = SourceTestFixture(authenticator=oauth_authenticator) + + with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): # clear=True clears the existing os.environ dict + args = ['read', '--config', 'config.json', '--catalog', 'configured_catalog.json'] + if expected_error: + with pytest.raises(expected_error): + launch(source, args) + else: + launch(source, args) diff --git a/airbyte-cdk/python/unit_tests/test_entrypoint.py b/airbyte-cdk/python/unit_tests/test_entrypoint.py index 047e94adfa2..5cd37e5d503 100644 --- a/airbyte-cdk/python/unit_tests/test_entrypoint.py +++ b/airbyte-cdk/python/unit_tests/test_entrypoint.py @@ -2,13 +2,15 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import os from argparse import Namespace from copy import deepcopy from typing import Any, List, Mapping, MutableMapping, Union -from unittest.mock import MagicMock +from unittest import mock +from unittest.mock import MagicMock, patch import pytest +import requests from airbyte_cdk import AirbyteEntrypoint from airbyte_cdk import entrypoint as entrypoint_module from airbyte_cdk.models import ( @@ -258,3 +260,39 @@ def test_run_read_with_exception(entrypoint: AirbyteEntrypoint, mocker, spec_moc def test_invalid_command(entrypoint: AirbyteEntrypoint, config_mock): with pytest.raises(Exception): list(entrypoint.run(Namespace(command="invalid", config="conf"))) + + +@pytest.mark.parametrize( + "deployment_mode, url, expected_error", + [ + pytest.param("CLOUD", "https://airbyte.com", None, id="test_cloud_public_endpoint_is_successful"), + pytest.param("CLOUD", "https://192.168.27.30", ValueError, id="test_cloud_private_ip_address_is_rejected"), + pytest.param("CLOUD", "https://localhost:8080/api/v1/cast", ValueError, id="test_cloud_private_endpoint_is_rejected"), + pytest.param("CLOUD", "http://past.lives.net/api/v1/inyun", ValueError, id="test_cloud_unsecured_endpoint_is_rejected"), + pytest.param("CLOUD", "https://not:very/cash:443.money", ValueError, id="test_cloud_invalid_url_format"), + pytest.param("CLOUD", "https://192.168.27.30 ", ValueError, id="test_cloud_incorrect_ip_format_is_rejected"), + pytest.param("cloud", "https://192.168.27.30", ValueError, id="test_case_insensitive_cloud_environment_variable"), + pytest.param("OSS", "https://airbyte.com", None, id="test_oss_public_endpoint_is_successful"), + pytest.param("OSS", "https://192.168.27.30", None, id="test_oss_private_endpoint_is_successful"), + pytest.param("OSS", "https://localhost:8080/api/v1/cast", None, id="test_oss_private_endpoint_is_successful"), + pytest.param("OSS", "http://past.lives.net/api/v1/inyun", None, id="test_oss_unsecured_endpoint_is_successful"), + ] +) +@patch.object(requests.Session, "send", lambda self, request, **kwargs: requests.Response()) +def test_filter_internal_requests(deployment_mode, url, expected_error): + with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): + AirbyteEntrypoint(source=MockSource()) + + session = requests.Session() + + prepared_request = requests.PreparedRequest() + prepared_request.method = "GET" + prepared_request.headers = {"header": "value"} + prepared_request.url = url + + if expected_error: + with pytest.raises(expected_error): + session.send(request=prepared_request) + else: + actual_response = session.send(request=prepared_request) + assert isinstance(actual_response, requests.Response)