Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
60 lines
2.3 KiB
Python
60 lines
2.3 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import urllib.parse
|
|
from typing import Any, List, MutableMapping
|
|
from unittest import mock
|
|
from unittest.mock import MagicMock
|
|
|
|
from source_mixpanel import SourceMixpanel
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
|
|
from airbyte_cdk.sources import Source
|
|
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.streams import Stream
|
|
from airbyte_cdk.sources.streams.call_rate import APIBudget
|
|
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
|
|
|
|
|
|
def setup_response(status, body):
|
|
return [{"json": body, "status_code": status}]
|
|
|
|
|
|
def get_url_to_mock(stream):
|
|
if isinstance(stream, DeclarativeStream):
|
|
return urllib.parse.urljoin(stream.retriever.requester._url_base.eval(config=stream.config), stream.retriever.requester.path)
|
|
return urllib.parse.urljoin(stream.url_base, stream.path())
|
|
|
|
|
|
def command_check(source: Source, config):
|
|
logger = mock.MagicMock()
|
|
connector_config, _ = split_config(config)
|
|
if source.check_config_against_spec:
|
|
source_spec: ConnectorSpecification = source.spec(logger)
|
|
check_config_against_spec_or_exit(connector_config, source_spec)
|
|
return source.check(logger, config)
|
|
|
|
|
|
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any], cursor_field: List[str] = None):
|
|
res = []
|
|
stream_instance.state = stream_state
|
|
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_state=stream_state)
|
|
for slice in slices:
|
|
records = stream_instance.read_records(
|
|
sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_slice=slice, stream_state=stream_state
|
|
)
|
|
for record in records:
|
|
stream_state = stream_instance.get_updated_state(stream_state, record)
|
|
res.append(record)
|
|
return res
|
|
|
|
|
|
def init_stream(name="", config=None):
|
|
streams = SourceMixpanel(MagicMock(), config, MagicMock()).streams(config)
|
|
for stream in streams:
|
|
if stream.name == name:
|
|
return stream
|