1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py
2025-04-14 20:45:36 +03:00

60 lines
2.3 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import urllib.parse
from typing import Any, List, MutableMapping
from unittest import mock
from unittest.mock import MagicMock
from source_mixpanel import SourceMixpanel
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.call_rate import APIBudget
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
def setup_response(status, body):
return [{"json": body, "status_code": status}]
def get_url_to_mock(stream):
if isinstance(stream, DeclarativeStream):
return urllib.parse.urljoin(stream.retriever.requester._url_base.eval(config=stream.config), stream.retriever.requester.path)
return urllib.parse.urljoin(stream.url_base, stream.path())
def command_check(source: Source, config):
logger = mock.MagicMock()
connector_config, _ = split_config(config)
if source.check_config_against_spec:
source_spec: ConnectorSpecification = source.spec(logger)
check_config_against_spec_or_exit(connector_config, source_spec)
return source.check(logger, config)
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any], cursor_field: List[str] = None):
res = []
stream_instance.state = stream_state
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_state=stream_state)
for slice in slices:
records = stream_instance.read_records(
sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_slice=slice, stream_state=stream_state
)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
res.append(record)
return res
def init_stream(name="", config=None):
streams = SourceMixpanel(MagicMock(), config, MagicMock()).streams(config)
for stream in streams:
if stream.name == name:
return stream