1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-google-sheets/unit_tests/conftest.py
2025-07-07 12:15:11 -04:00

53 lines
1.5 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import sys
from pathlib import Path
import pytest
from airbyte_cdk.models.airbyte_protocol import AirbyteStream, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
pytest_plugins = ["airbyte_cdk.test.utils.manifest_only_fixtures"]
def _get_manifest_path() -> Path:
source_declarative_manifest_path = Path("/airbyte/integration_code/source_declarative_manifest")
if source_declarative_manifest_path.exists():
return source_declarative_manifest_path
return Path(__file__).parent.parent
_SOURCE_FOLDER_PATH = _get_manifest_path()
_YAML_FILE_PATH = _SOURCE_FOLDER_PATH / "manifest.yaml"
sys.path.append(str(_SOURCE_FOLDER_PATH)) # to allow loading custom components
@pytest.fixture
def invalid_config():
return {
"spreadsheet_id": "invalid_spreadsheet_id",
"credentials": {
"auth_type": "Client",
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
},
}
@pytest.fixture
def catalog():
def maker(*name_schema_pairs):
for name, schema in name_schema_pairs:
yield ConfiguredAirbyteStream(
stream=AirbyteStream(name=name, json_schema=schema, supported_sync_modes=["full_refresh"]),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
return maker