1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-sentry/unit_tests/conftest.py
suisuixia42 f109a380c6 mock unit test for source sentry (#70234)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
2025-12-08 13:29:51 -08:00

64 lines
2.0 KiB
Python

# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
import os
import sys
from pathlib import Path
from pytest import fixture
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.state_builder import StateBuilder
# Load CDK's manifest-only test fixtures
pytest_plugins = ["airbyte_cdk.test.utils.manifest_only_fixtures"]
# Set up request cache path
os.environ["REQUEST_CACHE_PATH"] = "REQUEST_CACHE_PATH"
def _get_manifest_path() -> Path:
"""
Find manifest.yaml location.
In CI (Docker): /airbyte/integration_code/source_declarative_manifest/manifest.yaml
Locally: ../manifest.yaml (relative to unit_tests/)
"""
ci_path = Path("/airbyte/integration_code/source_declarative_manifest")
if ci_path.exists():
return ci_path
return Path(__file__).parent.parent # Local: parent of unit_tests/
_SOURCE_FOLDER_PATH = _get_manifest_path()
_YAML_FILE_PATH = _SOURCE_FOLDER_PATH / "manifest.yaml"
# Add to path to allow importing custom components (if you have components.py)
sys.path.append(str(_SOURCE_FOLDER_PATH))
def get_source(config, state=None) -> YamlDeclarativeSource:
"""
Create a YamlDeclarativeSource instance for testing.
This is the main entry point for running your connector in tests.
"""
catalog = CatalogBuilder().build()
state = StateBuilder().build() if not state else state
return YamlDeclarativeSource(path_to_yaml=str(_YAML_FILE_PATH), catalog=catalog, config=config, state=state)
@fixture(autouse=True)
def clear_cache_before_each_test():
"""
CRITICAL: Clear request cache before each test!
Without this, cached responses from one test will affect other tests.
"""
cache_dir = Path(os.getenv("REQUEST_CACHE_PATH"))
if cache_dir.exists() and cache_dir.is_dir():
for file_path in cache_dir.glob("*.sqlite"):
file_path.unlink()
yield # Test runs here