1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Emit final state message for full refresh syncs and consolidate read flows (#35622)

This commit is contained in:
Brian Lai
2024-03-05 01:05:06 -05:00
committed by GitHub
parent e11f0fefc1
commit ef98194673
23 changed files with 1467 additions and 830 deletions

View File

@@ -23,7 +23,7 @@ from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams import FULL_REFRESH_SENTINEL_STATE_KEY, Stream
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
@@ -181,10 +181,6 @@ class AbstractSource(Source, ABC):
def raise_exception_on_missing_stream(self) -> bool:
return True
@property
def per_stream_state_enabled(self) -> bool:
return True
def _read_stream(
self,
logger: logging.Logger,
@@ -206,22 +202,32 @@ class AbstractSource(Source, ABC):
)
stream_instance.log_stream_sync_configuration()
use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(
logger,
stream_instance,
configured_stream,
state_manager,
internal_config,
)
else:
record_iterator = self._read_full_refresh(logger, stream_instance, configured_stream, internal_config)
stream_name = configured_stream.stream.name
# The platform always passes stream state regardless of sync mode. We shouldn't need to consider this case within the
# connector, but right now we need to prevent accidental usage of the previous stream state
stream_state = (
state_manager.get_stream_state(stream_name, stream_instance.namespace)
if configured_stream.sync_mode == SyncMode.incremental
else {}
)
if stream_state and "state" in dir(stream_instance) and not self._stream_state_is_full_refresh(stream_state):
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
logger.info(f"Setting state of {self.name} stream to {stream_state}")
record_iterator = stream_instance.read(
configured_stream,
logger,
self._slice_logger,
stream_state,
state_manager,
internal_config,
)
record_counter = 0
stream_name = configured_stream.stream.name
logger.info(f"Syncing stream: {stream_name} ")
for record in record_iterator:
for record_data_or_message in record_iterator:
record = self._get_message(record_data_or_message, stream_instance)
if record.type == MessageType.RECORD:
record_counter += 1
if record_counter == 1:
@@ -233,62 +239,11 @@ class AbstractSource(Source, ABC):
logger.info(f"Read {record_counter} records from {stream_name} stream")
def _read_incremental(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Read stream using incremental algorithm
:param logger:
:param stream_instance:
:param configured_stream:
:param state_manager:
:param internal_config:
:return:
"""
stream_name = configured_stream.stream.name
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
if stream_state and "state" in dir(stream_instance):
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
logger.info(f"Setting state of {self.name} stream to {stream_state}")
for record_data_or_message in stream_instance.read_incremental(
configured_stream.cursor_field,
logger,
self._slice_logger,
stream_state,
state_manager,
self.per_stream_state_enabled,
internal_config,
):
yield self._get_message(record_data_or_message, stream_instance)
def _emit_queued_messages(self) -> Iterable[AirbyteMessage]:
if self.message_repository:
yield from self.message_repository.consume_queue()
return
def _read_full_refresh(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
total_records_counter = 0
for record_data_or_message in stream_instance.read_full_refresh(configured_stream.cursor_field, logger, self._slice_logger):
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if internal_config.is_limit_reached(total_records_counter):
return
def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream) -> AirbyteMessage:
"""
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
@@ -317,3 +272,9 @@ class AbstractSource(Source, ABC):
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
return f"During the sync, the following streams did not sync successfully: {failures}"
@staticmethod
def _stream_state_is_full_refresh(stream_state: Mapping[str, Any]) -> bool:
# For full refresh syncs that don't have a suitable cursor value, we emit a state that contains a sentinel key.
# This key is never used by a connector and is needed during a read to skip assigning the incoming state.
return FULL_REFRESH_SENTINEL_STATE_KEY in stream_state

View File

@@ -77,7 +77,7 @@ class ConnectorStateManager:
stream_descriptor = HashableStreamDescriptor(name=stream_name, namespace=namespace)
self.per_stream_states[stream_descriptor] = AirbyteStateBlob.parse_obj(value)
def create_state_message(self, stream_name: str, namespace: Optional[str], send_per_stream_state: bool) -> AirbyteMessage:
def create_state_message(self, stream_name: str, namespace: Optional[str]) -> AirbyteMessage:
"""
Generates an AirbyteMessage using the current per-stream state of a specified stream in either the per-stream or legacy format
:param stream_name: The name of the stream for the message that is being created
@@ -85,25 +85,18 @@ class ConnectorStateManager:
:param send_per_stream_state: Decides which state format the message should be generated as
:return: The Airbyte state message to be emitted by the connector during a sync
"""
if send_per_stream_state:
hashable_descriptor = HashableStreamDescriptor(name=stream_name, namespace=namespace)
stream_state = self.per_stream_states.get(hashable_descriptor) or AirbyteStateBlob()
hashable_descriptor = HashableStreamDescriptor(name=stream_name, namespace=namespace)
stream_state = self.per_stream_states.get(hashable_descriptor) or AirbyteStateBlob()
# According to the Airbyte protocol, the StreamDescriptor namespace field is not required. However, the platform will throw
# a validation error if it receives namespace=null. That is why if namespace is None, the field should be omitted instead.
stream_descriptor = (
StreamDescriptor(name=stream_name) if namespace is None else StreamDescriptor(name=stream_name, namespace=namespace)
)
return AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(stream_descriptor=stream_descriptor, stream_state=stream_state),
data=dict(self._get_legacy_state()),
return AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=stream_name, namespace=namespace), stream_state=stream_state
),
)
return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=dict(self._get_legacy_state())))
),
)
@classmethod
def _extract_from_state_message(
@@ -176,13 +169,6 @@ class ConnectorStateManager:
streams[stream_descriptor] = AirbyteStateBlob.parse_obj(state_value or {})
return streams
def _get_legacy_state(self) -> Mapping[str, Any]:
"""
Using the current per-stream state, creates a mapping of all the stream states for the connector being synced
:return: A deep copy of the mapping of stream name to stream state value
"""
return {descriptor.name: state.dict() if state else {} for descriptor, state in self.per_stream_states.items()}
@staticmethod
def _is_legacy_dict_state(state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]]) -> bool:
return isinstance(state, dict)

View File

@@ -7,7 +7,7 @@ import logging
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, ConfiguredAirbyteStream, Level, SyncMode, Type
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.file_based.availability_strategy import (
@@ -156,29 +156,13 @@ class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBas
def get_underlying_stream(self) -> DefaultStream:
return self._abstract_stream
def read_full_refresh(
def read(
self,
cursor_field: Optional[List[str]],
logger: logging.Logger,
slice_logger: SliceLogger,
) -> Iterable[StreamData]:
"""
Read full refresh. Delegate to the underlying AbstractStream, ignoring all the parameters
:param cursor_field: (ignored)
:param logger: (ignored)
:param slice_logger: (ignored)
:return: Iterable of StreamData
"""
yield from self._read_records()
def read_incremental(
self,
cursor_field: Optional[List[str]],
configured_stream: ConfiguredAirbyteStream,
logger: logging.Logger,
slice_logger: SliceLogger,
stream_state: MutableMapping[str, Any],
state_manager: ConnectorStateManager,
per_stream_state_enabled: bool,
internal_config: InternalConfig,
) -> Iterable[StreamData]:
yield from self._read_records()

View File

@@ -155,9 +155,7 @@ class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor):
self._stream_namespace,
new_state,
)
state_message = self._connector_state_manager.create_state_message(
self._stream_name, self._stream_namespace, send_per_stream_state=True
)
state_message = self._connector_state_manager.create_state_message(self._stream_name, self._stream_namespace)
self._message_repository.emit_message(state_message)
def _get_new_cursor_value(self) -> str:

View File

@@ -3,6 +3,6 @@
#
# Initialize Streams Package
from .core import IncrementalMixin, Stream
from .core import FULL_REFRESH_SENTINEL_STATE_KEY, IncrementalMixin, Stream
__all__ = ["IncrementalMixin", "Stream"]
__all__ = ["FULL_REFRESH_SENTINEL_STATE_KEY", "IncrementalMixin", "Stream"]

View File

@@ -8,7 +8,7 @@ import logging
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, Level, SyncMode, Type
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, Level, SyncMode, Type
from airbyte_cdk.sources import AbstractSource, Source
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import MessageRepository
@@ -116,29 +116,13 @@ class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
self._slice_logger = slice_logger
self._logger = logger
def read_full_refresh(
def read(
self,
cursor_field: Optional[List[str]],
logger: logging.Logger,
slice_logger: SliceLogger,
) -> Iterable[StreamData]:
"""
Read full refresh. Delegate to the underlying AbstractStream, ignoring all the parameters
:param cursor_field: (ignored)
:param logger: (ignored)
:param slice_logger: (ignored)
:return: Iterable of StreamData
"""
yield from self._read_records()
def read_incremental(
self,
cursor_field: Optional[List[str]],
configured_stream: ConfiguredAirbyteStream,
logger: logging.Logger,
slice_logger: SliceLogger,
stream_state: MutableMapping[str, Any],
state_manager: ConnectorStateManager,
per_stream_state_enabled: bool,
internal_config: InternalConfig,
) -> Iterable[StreamData]:
yield from self._read_records()

View File

@@ -184,9 +184,7 @@ class ConcurrentCursor(Cursor):
# TODO: if we migrate stored state to the concurrent state format
# (aka stop calling self._connector_state_converter.convert_to_sequential_state`), we'll need to cast datetimes to string or
# int before emitting state
state_message = self._connector_state_manager.create_state_message(
self._stream_name, self._stream_namespace, send_per_stream_state=True
)
state_message = self._connector_state_manager.create_state_message(self._stream_name, self._stream_namespace)
self._message_repository.emit_message(state_message)
def _merge_partitions(self) -> None:

View File

@@ -11,7 +11,7 @@ from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, SyncMode
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, SyncMode
from airbyte_cdk.models import Type as MessageType
# list of all possible HTTP methods which can be used for sending of request bodies
@@ -31,6 +31,10 @@ StreamData = Union[Mapping[str, Any], AirbyteMessage]
JsonSchema = Mapping[str, Any]
# Streams that only support full refresh don't have a suitable cursor so this sentinel
# value is used to indicate that stream should not load the incoming state value
FULL_REFRESH_SENTINEL_STATE_KEY = "__ab_full_refresh_state_message"
def package_name_from_class(cls: object) -> str:
"""Find the package name given a class name"""
@@ -107,39 +111,24 @@ class Stream(ABC):
"""
return None
def read_full_refresh(
def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies
self,
cursor_field: Optional[List[str]],
logger: logging.Logger,
slice_logger: SliceLogger,
) -> Iterable[StreamData]:
slices = self.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=cursor_field)
logger.debug(f"Processing stream slices for {self.name} (sync_mode: full_refresh)", extra={"stream_slices": slices})
for _slice in slices:
if slice_logger.should_log_slice_message(logger):
yield slice_logger.create_slice_log_message(_slice)
yield from self.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=cursor_field,
)
def read_incremental( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies
self,
cursor_field: Optional[List[str]],
configured_stream: ConfiguredAirbyteStream,
logger: logging.Logger,
slice_logger: SliceLogger,
stream_state: MutableMapping[str, Any],
state_manager,
per_stream_state_enabled: bool,
internal_config: InternalConfig,
) -> Iterable[StreamData]:
sync_mode = configured_stream.sync_mode
cursor_field = configured_stream.cursor_field
slices = self.stream_slices(
cursor_field=cursor_field,
sync_mode=SyncMode.incremental,
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior
stream_state=stream_state,
)
logger.debug(f"Processing stream slices for {self.name} (sync_mode: incremental)", extra={"stream_slices": slices})
logger.debug(f"Processing stream slices for {self.name} (sync_mode: {sync_mode.name})", extra={"stream_slices": slices})
has_slices = False
record_counter = 0
@@ -148,7 +137,7 @@ class Stream(ABC):
if slice_logger.should_log_slice_message(logger):
yield slice_logger.create_slice_log_message(_slice)
records = self.read_records(
sync_mode=SyncMode.incremental,
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior
stream_slice=_slice,
stream_state=stream_state,
cursor_field=cursor_field or None,
@@ -160,20 +149,34 @@ class Stream(ABC):
):
record_data = record_data_or_message if isinstance(record_data_or_message, Mapping) else record_data_or_message.record
stream_state = self.get_updated_state(stream_state, record_data)
checkpoint_interval = self.state_checkpoint_interval
record_counter += 1
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_state, state_manager, per_stream_state_enabled)
if sync_mode == SyncMode.incremental:
# Checkpoint intervals are a bit controversial, but see below comment about why we're gating it right now
checkpoint_interval = self.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
yield airbyte_state_message
if internal_config.is_limit_reached(record_counter):
break
yield self._checkpoint_state(stream_state, state_manager, per_stream_state_enabled)
if sync_mode == SyncMode.incremental:
# Even though right now, only incremental streams running as incremental mode will emit periodic checkpoints. Rather than
# overhaul how refresh interacts with the platform, this positions the code so that once we want to start emitting
# periodic checkpoints in full refresh mode it can be done here
airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
yield airbyte_state_message
if not has_slices:
# Safety net to ensure we always emit at least one state message even if there are no slices
checkpoint = self._checkpoint_state(stream_state, state_manager, per_stream_state_enabled)
yield checkpoint
if not has_slices or sync_mode == SyncMode.full_refresh:
if sync_mode == SyncMode.full_refresh:
# We use a dummy state if there is no suitable value provided by full_refresh streams that do not have a valid cursor.
# Incremental streams running full_refresh mode emit a meaningful state
stream_state = stream_state or {FULL_REFRESH_SENTINEL_STATE_KEY: True}
# We should always emit a final state message for full refresh sync or streams that do not have any slices
airbyte_state_message = self._checkpoint_state(stream_state, state_manager)
yield airbyte_state_message
@abstractmethod
def read_records(
@@ -361,7 +364,6 @@ class Stream(ABC):
self,
stream_state: Mapping[str, Any],
state_manager,
per_stream_state_enabled: bool,
) -> AirbyteMessage:
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream
@@ -373,4 +375,4 @@ class Stream(ABC):
except AttributeError:
state_manager.update_state_for_stream(self.name, self.namespace, stream_state)
return state_manager.create_state_message(self.name, self.namespace, send_per_stream_state=per_stream_state_enabled)
return state_manager.create_state_message(self.name, self.namespace)

View File

@@ -81,13 +81,14 @@ def test_concurrent_source_adapter():
def _mock_stream(name: str, data=[], available: bool = True):
s = Mock()
s.name = name
s.namespace = None
s.as_airbyte_stream.return_value = AirbyteStream(
name=name,
json_schema={},
supported_sync_modes=[SyncMode.full_refresh],
)
s.check_availability.return_value = (True, None) if available else (False, "not available")
s.read_full_refresh.return_value = iter(data)
s.read.return_value = iter(data)
s.primary_key = None
return s

View File

@@ -74,10 +74,8 @@ single_csv_input_state_is_earlier_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z", "a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z", "a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
]
)
@@ -156,10 +154,8 @@ single_csv_file_is_skipped_if_same_modified_at_as_in_history_concurrent = (
.set_expected_records(
[
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
]
)
@@ -256,10 +252,8 @@ single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history_concurre
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
]
)
@@ -368,10 +362,8 @@ single_csv_no_input_state_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
]
)
@@ -491,10 +483,8 @@ multi_csv_same_timestamp_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
]
)
@@ -583,13 +573,11 @@ single_csv_input_state_is_later_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"recent_file.csv": "2023-07-15T23:59:59.000000Z",
"a.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-07-15T23:59:59.000000Z_recent_file.csv",
}
"history": {
"recent_file.csv": "2023-07-15T23:59:59.000000Z",
"a.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-07-15T23:59:59.000000Z_recent_file.csv",
},
]
)
@@ -697,12 +685,10 @@ multi_csv_different_timestamps_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-04T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-04T03:54:07.000000Z_a.csv",
}
"history": {
"a.csv": "2023-06-04T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-04T03:54:07.000000Z_a.csv",
},
{
"data": {
@@ -725,10 +711,8 @@ multi_csv_different_timestamps_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-04T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-04T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
]
)
@@ -856,10 +840,8 @@ multi_csv_per_timestamp_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -882,14 +864,12 @@ multi_csv_per_timestamp_scenario_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -1001,10 +981,8 @@ multi_csv_skip_file_if_already_in_history_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -1027,14 +1005,12 @@ multi_csv_skip_file_if_already_in_history_concurrent = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -1153,14 +1129,12 @@ multi_csv_include_missing_files_within_history_range_concurrent_cursor_is_newer
# {"data": {"col1": "val11c", "col2": "val12c", "col3": "val13c"}, "stream": "stream1"}, # this file is skipped
# {"data": {"col1": "val21c", "col2": "val22c", "col3": "val23c"}, "stream": "stream1"}, # this file is skipped
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -1282,14 +1256,12 @@ multi_csv_include_missing_files_within_history_range_concurrent_cursor_is_older
# {"data": {"col1": "val11c", "col2": "val12c", "col3": "val13c"}, "stream": "stream1"}, # this file is skipped
# {"data": {"col1": "val21c", "col2": "val22c", "col3": "val23c"}, "stream": "stream1"}, # this file is skipped
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -1405,14 +1377,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_newe
"stream": "stream1",
},
{
"stream1": {
"history": {
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
}
"history": {
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
},
{
"data": {
@@ -1435,14 +1405,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_newe
"stream": "stream1",
},
{
"stream1": {
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-07T03:54:07.000000Z_b.csv",
}
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-07T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -1465,14 +1433,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_newe
"stream": "stream1",
},
{
"stream1": {
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
"c.csv": "2023-06-10T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-10T03:54:07.000000Z_c.csv",
}
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
"c.csv": "2023-06-10T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-10T03:54:07.000000Z_c.csv",
},
]
)
@@ -1592,14 +1558,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_olde
"stream": "stream1",
},
{
"stream1": {
"history": {
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
}
"history": {
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
},
{
"data": {
@@ -1622,14 +1586,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_olde
"stream": "stream1",
},
{
"stream1": {
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-07T03:54:07.000000Z_b.csv",
}
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-07T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -1652,14 +1614,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario_concurrent_cursor_is_olde
"stream": "stream1",
},
{
"stream1": {
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
"c.csv": "2023-06-10T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-10T03:54:07.000000Z_c.csv",
}
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
"c.csv": "2023-06-10T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-10T03:54:07.000000Z_c.csv",
},
]
)
@@ -1848,14 +1808,12 @@ multi_csv_same_timestamp_more_files_than_history_size_scenario_concurrent_cursor
"stream": "stream1",
},
{
"stream1": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
},
]
)
@@ -2032,14 +1990,12 @@ multi_csv_same_timestamp_more_files_than_history_size_scenario_concurrent_cursor
"stream": "stream1",
},
{
"stream1": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
},
]
)
@@ -2138,14 +2094,12 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario_concurrent_cursor_
.set_expected_records(
[
{
"stream1": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
]
)
@@ -2256,14 +2210,12 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario_concurrent_cursor_
.set_expected_records(
[
{
"stream1": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
}
]
)
@@ -2397,14 +2349,12 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"stream": "stream1",
},
{
"stream1": {
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
}
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
},
]
)
@@ -2537,14 +2487,12 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"stream": "stream1",
},
{
"stream1": {
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
}
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
},
]
)
@@ -2675,14 +2623,12 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
},
{
"data": {
@@ -2705,14 +2651,12 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"stream": "stream1",
},
{
"stream1": {
"history": {
"b.csv": "2023-06-06T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-06T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
},
]
)
@@ -2843,14 +2787,12 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
},
{
"data": {
@@ -2873,14 +2815,12 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"stream": "stream1",
},
{
"stream1": {
"history": {
"b.csv": "2023-06-06T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-06T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
},
]
)

View File

@@ -73,10 +73,8 @@ single_csv_input_state_is_earlier_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z", "a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z", "a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
]
)
@@ -154,10 +152,8 @@ single_csv_file_is_skipped_if_same_modified_at_as_in_history = (
.set_expected_records(
[
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
]
)
@@ -253,10 +249,8 @@ single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
]
)
@@ -365,10 +359,8 @@ single_csv_no_input_state_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_a.csv",
},
]
)
@@ -488,10 +480,8 @@ multi_csv_same_timestamp_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
]
)
@@ -580,13 +570,11 @@ single_csv_input_state_is_later_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"recent_file.csv": "2023-07-15T23:59:59.000000Z",
"a.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-07-15T23:59:59.000000Z_recent_file.csv",
}
"history": {
"recent_file.csv": "2023-07-15T23:59:59.000000Z",
"a.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-07-15T23:59:59.000000Z_recent_file.csv",
},
]
)
@@ -693,12 +681,10 @@ multi_csv_different_timestamps_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-04T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-04T03:54:07.000000Z_a.csv",
}
"history": {
"a.csv": "2023-06-04T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-04T03:54:07.000000Z_a.csv",
},
{
"data": {
@@ -721,10 +707,8 @@ multi_csv_different_timestamps_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-04T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-04T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
]
)
@@ -852,10 +836,8 @@ multi_csv_per_timestamp_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -878,14 +860,12 @@ multi_csv_per_timestamp_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -997,10 +977,8 @@ multi_csv_skip_file_if_already_in_history = (
"stream": "stream1",
},
{
"stream1": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
}
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "b.csv": "2023-06-05T03:54:07.000000Z"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -1023,14 +1001,12 @@ multi_csv_skip_file_if_already_in_history = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -1151,14 +1127,12 @@ multi_csv_include_missing_files_within_history_range = (
# {"data": {"col1": "val11c", "col2": "val12c", "col3": "val13c"}, "stream": "stream1"}, # this file is skipped
# {"data": {"col1": "val21c", "col2": "val22c", "col3": "val23c"}, "stream": "stream1"}, # this file is skipped
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_c.csv",
},
]
)
@@ -1273,14 +1247,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
}
"history": {
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z_old_file_same_timestamp_as_a.csv",
},
{
"data": {
@@ -1303,14 +1275,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-07T03:54:07.000000Z_b.csv",
}
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-07T03:54:07.000000Z_b.csv",
},
{
"data": {
@@ -1333,14 +1303,12 @@ multi_csv_remove_old_files_if_history_is_full_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
"c.csv": "2023-06-10T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-10T03:54:07.000000Z_c.csv",
}
"history": {
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
"b.csv": "2023-06-07T03:54:07.000000Z",
"c.csv": "2023-06-10T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-10T03:54:07.000000Z_c.csv",
},
]
)
@@ -1528,14 +1496,12 @@ multi_csv_same_timestamp_more_files_than_history_size_scenario = (
"stream": "stream1",
},
{
"stream1": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
},
]
)
@@ -1634,14 +1600,12 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario = (
.set_expected_records(
[
{
"stream1": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z_d.csv",
}
]
)
@@ -1773,14 +1737,12 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"stream": "stream1",
},
{
"stream1": {
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
}
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_e.csv",
},
]
)
@@ -1908,14 +1870,12 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"stream": "stream1",
},
{
"stream1": {
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
"history": {
"a.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
},
{
"data": {
@@ -1938,14 +1898,12 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"stream": "stream1",
},
{
"stream1": {
"history": {
"b.csv": "2023-06-06T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
}
"history": {
"b.csv": "2023-06-06T03:54:07.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
"_ab_source_file_last_modified": "2023-06-08T03:54:07.000000Z_d.csv",
},
]
)

View File

@@ -226,25 +226,14 @@ class StreamFacadeTest(unittest.TestCase):
assert actual_stream_data == expected_stream_data
def test_read_records_full_refresh(self):
def test_read_records(self):
expected_stream_data = [{"data": 1}, {"data": 2}]
records = [Record(data, "stream") for data in expected_stream_data]
partition = Mock()
partition.read.return_value = records
self._abstract_stream.generate_partitions.return_value = [partition]
actual_stream_data = list(self._facade.read_full_refresh(None, None, None))
assert actual_stream_data == expected_stream_data
def test_read_records_incremental(self):
expected_stream_data = [{"data": 1}, {"data": 2}]
records = [Record(data, "stream") for data in expected_stream_data]
partition = Mock()
partition.read.return_value = records
self._abstract_stream.generate_partitions.return_value = [partition]
actual_stream_data = list(self._facade.read_incremental(None, None, None, None, None, None, None))
actual_stream_data = list(self._facade.read(None, None, None, None, None, None))
assert actual_stream_data == expected_stream_data

View File

@@ -182,7 +182,7 @@ def test_add_file(
uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in expected_pending_files
}
assert (
mock_message_repository.emit_message.call_args_list[0].args[0].state.data["test"]["_ab_source_file_last_modified"]
mock_message_repository.emit_message.call_args_list[0].args[0].state.stream.stream_state._ab_source_file_last_modified
== expected_cursor_value
)
@@ -233,7 +233,7 @@ def test_add_file_invalid(
}
assert mock_message_repository.emit_message.call_args_list[0].args[0].log.level.value == "WARN"
assert (
mock_message_repository.emit_message.call_args_list[1].args[0].state.data["test"]["_ab_source_file_last_modified"]
mock_message_repository.emit_message.call_args_list[1].args[0].state.stream.stream_state._ab_source_file_last_modified
== expected_cursor_value
)

View File

@@ -109,10 +109,10 @@ def _verify_read_output(output: EntrypointOutput, scenario: TestScenario[Abstrac
if hasattr(scenario.source, "cursor_cls") and issubclass(scenario.source.cursor_cls, AbstractConcurrentFileBasedCursor):
# Only check the last state emitted because we don't know the order the others will be in.
# This may be needed for non-file-based concurrent scenarios too.
assert states[-1].state.data == expected_states[-1]
assert states[-1].state.stream.stream_state.dict() == expected_states[-1]
else:
for actual, expected in zip(states, expected_states): # states should be emitted in sorted order
assert actual.state.data == expected
assert actual.state.stream.stream_state.dict() == expected
if scenario.expected_logs:
read_logs = scenario.expected_logs.get("read")

View File

@@ -0,0 +1,325 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import logging
from abc import ABC
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import pendulum
import requests
from airbyte_cdk.sources import AbstractSource, Source
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_protocol.models import ConnectorSpecification, SyncMode
from requests import HTTPError
class FixtureAvailabilityStrategy(HttpAvailabilityStrategy):
"""
Inherit from HttpAvailabilityStrategy with slight modification to 403 error message.
"""
def reasons_for_unavailable_status_codes(self, stream: Stream, logger: logging.Logger, source: Source, error: HTTPError) -> Dict[int, str]:
reasons_for_codes: Dict[int, str] = {
requests.codes.FORBIDDEN: "This is likely due to insufficient permissions for your Notion integration. "
"Please make sure your integration has read access for the resources you are trying to sync"
}
return reasons_for_codes
class IntegrationStream(HttpStream, ABC):
url_base = "https://api.airbyte-test.com/v1/"
primary_key = "id"
page_size = 100
raise_on_http_errors = True
current_page = 0
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__(**kwargs)
self.start_date = config.get("start_date")
@property
def availability_strategy(self) -> HttpAvailabilityStrategy:
return FixtureAvailabilityStrategy()
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data = response.json().get("data", [])
yield from data
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
has_more = response.json().get("has_more")
if has_more:
self.current_page += 1
return {"next_page": self.current_page}
class IncrementalIntegrationStream(IntegrationStream, IncrementalMixin, ABC):
cursor_field = "created_at"
_state = {}
@property
def state(self) -> MutableMapping[str, Any]:
return self._state
@state.setter
def state(self, value: MutableMapping[str, Any]) -> None:
self._state = value
def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
self.state = {self.cursor_field: record.get(self.cursor_field)}
yield record
class Users(IntegrationStream):
def path(self, **kwargs) -> str:
return "users"
def get_json_schema(self) -> Mapping[str, Any]:
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {
"type": {
"type": "string"
},
"id": {
"type": "string"
},
"created_at": {
"type": "string",
"format": "date-time"
},
"first_name": {
"type": "string"
},
"last_name": {
"type": "string"
}
}
}
class Planets(IncrementalIntegrationStream):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._state: MutableMapping[str, Any] = {}
def path(self, **kwargs) -> str:
return "planets"
def get_json_schema(self) -> Mapping[str, Any]:
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {
"type": {
"type": "string"
},
"id": {
"type": "string"
},
"created_at": {
"type": "string",
"format": "date-time"
},
"name": {
"type": "string"
}
}
}
def request_params(
self,
stream_state: Optional[Mapping[str, Any]],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return {
"start_date": stream_slice.get("start_date"),
"end_date": stream_slice.get("end_date")
}
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start_date = pendulum.parse(self.start_date)
if stream_state:
start_date = pendulum.parse(stream_state.get(self.cursor_field))
date_slices = []
end_date = datetime.now(timezone.utc).replace(microsecond=0)
while start_date < end_date:
end_date_slice = min(start_date.add(days=7), end_date)
date_slice = {"start_date": start_date.strftime("%Y-%m-%dT%H:%M:%SZ"), "end_date": end_date_slice.strftime("%Y-%m-%dT%H:%M:%SZ")}
date_slices.append(date_slice)
start_date = end_date_slice
return date_slices
class Legacies(IntegrationStream):
"""
Incremental stream that uses the legacy method get_updated_state() to manage stream state. New connectors use the state
property and setter methods.
"""
cursor_field = "created_at"
def path(self, **kwargs) -> str:
return "legacies"
def get_json_schema(self) -> Mapping[str, Any]:
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {
"type": {
"type": "string"
},
"id": {
"type": "string"
},
"created_at": {
"type": "string",
"format": "date-time"
},
"quote": {
"type": "string"
}
}
}
def get_updated_state(
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
latest_state = latest_record.get(self.cursor_field)
current_state = current_stream_state.get(self.cursor_field) or latest_state
if current_state:
return {self.cursor_field: max(latest_state, current_state)}
return {}
def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
def request_params(
self,
stream_state: Optional[Mapping[str, Any]],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return {
"start_date": stream_slice.get("start_date"),
"end_date": stream_slice.get("end_date")
}
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start_date = pendulum.parse(self.start_date)
if stream_state:
start_date = pendulum.parse(stream_state.get(self.cursor_field))
date_slices = []
end_date = datetime.now(timezone.utc).replace(microsecond=0)
while start_date < end_date:
end_date_slice = min(start_date.add(days=7), end_date)
date_slice = {"start_date": start_date.strftime("%Y-%m-%dT%H:%M:%SZ"), "end_date": end_date_slice.strftime("%Y-%m-%dT%H:%M:%SZ")}
date_slices.append(date_slice)
start_date = end_date_slice
return date_slices
class Dividers(IntegrationStream):
def path(self, **kwargs) -> str:
return "dividers"
def get_json_schema(self) -> Mapping[str, Any]:
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {
"type": {
"type": "string"
},
"id": {
"type": "string"
},
"created_at": {
"type": "string",
"format": "date-time"
},
"divide_category": {
"type": "string"
}
}
}
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [{"divide_category": "dukes"}, {"divide_category": "mentats"}]
def request_params(
self,
stream_state: Optional[Mapping[str, Any]],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return {"category": stream_slice.get("divide_category")}
class SourceFixture(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [Dividers(config=config), Legacies(config=config), Planets(config=config), Users(config=config)]
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
return ConnectorSpecification(
connectionSpecification={
"properties": {
"start_date": {
"title": "Start Date",
"description": "UTC date and time in the format YYYY-MM-DDTHH:MM:SS.000Z. During incremental sync, any data generated before this date will not be replicated. If left blank, the start date will be set to 2 years before the present date.",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"pattern_descriptor": "YYYY-MM-DDTHH:MM:SS.000Z",
"examples": ["2020-11-16T00:00:00.000Z"],
"type": "string",
"format": "date-time"
}
}
}
)

View File

@@ -0,0 +1,417 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from unittest import TestCase
import freezegun
import pytest
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode, Type
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest
from airbyte_cdk.test.mock_http.response_builder import (
FieldPath,
HttpResponseBuilder,
RecordBuilder,
create_record_builder,
create_response_builder,
)
from airbyte_protocol.models import AirbyteStreamStatus
from unit_tests.sources.mock_server_tests.mock_source_fixture import SourceFixture
_NOW = datetime.now(timezone.utc)
class RequestBuilder:
@classmethod
def dividers_endpoint(cls) -> "RequestBuilder":
return cls("dividers")
@classmethod
def legacies_endpoint(cls) -> "RequestBuilder":
return cls("legacies")
@classmethod
def planets_endpoint(cls) -> "RequestBuilder":
return cls("planets")
@classmethod
def users_endpoint(cls) -> "RequestBuilder":
return cls("users")
def __init__(self, resource: str) -> None:
self._resource = resource
self._start_date: Optional[datetime] = None
self._end_date: Optional[datetime] = None
self._category: Optional[str] = None
def with_start_date(self, start_date: datetime) -> "RequestBuilder":
self._start_date = start_date
return self
def with_end_date(self, end_date: datetime) -> "RequestBuilder":
self._end_date = end_date
return self
def with_category(self, category: str) -> "RequestBuilder":
self._category = category
return self
def build(self) -> HttpRequest:
query_params = {}
if self._start_date:
query_params["start_date"] = self._start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
if self._end_date:
query_params["end_date"] = self._end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
if self._category:
query_params["category"] = self._category
return HttpRequest(
url=f"https://api.airbyte-test.com/v1/{self._resource}",
query_params=query_params,
)
def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> ConfiguredAirbyteCatalog:
catalog_builder = CatalogBuilder()
for stream_name, sync_mode in names_and_sync_modes:
catalog_builder.with_stream(name=stream_name, sync_mode=sync_mode)
return catalog_builder.build()
def _create_dividers_request() -> RequestBuilder:
return RequestBuilder.dividers_endpoint()
def _create_legacies_request() -> RequestBuilder:
return RequestBuilder.legacies_endpoint()
def _create_planets_request() -> RequestBuilder:
return RequestBuilder.planets_endpoint()
def _create_users_request() -> RequestBuilder:
return RequestBuilder.users_endpoint()
RESPONSE_TEMPLATE = {
"object": "list",
"has_more": False,
"data": [
{
"id": "123",
"created_at": "2024-01-01T07:04:28.000Z"
}
]
}
USER_TEMPLATE = {
"object": "list",
"has_more": False,
"data": [
{
"id": "123",
"created_at": "2024-01-01T07:04:28",
"first_name": "Paul",
"last_name": "Atreides",
}
]
}
PLANET_TEMPLATE = {
"object": "list",
"has_more": False,
"data": [
{
"id": "456",
"created_at": "2024-01-01T07:04:28.000Z",
"name": "Giedi Prime",
}
]
}
LEGACY_TEMPLATE = {
"object": "list",
"has_more": False,
"data": [
{
"id": "l3g4cy",
"created_at": "2024-02-01T07:04:28.000Z",
"quote": "What do you leave behind?",
}
]
}
DIVIDER_TEMPLATE = {
"object": "list",
"has_more": False,
"data": [
{
"id": "l3t0",
"created_at": "2024-02-01T07:04:28.000Z",
"divide_category": "dukes",
}
]
}
RESOURCE_TO_TEMPLATE = {
"dividers": DIVIDER_TEMPLATE,
"legacies": LEGACY_TEMPLATE,
"planets": PLANET_TEMPLATE,
"users": USER_TEMPLATE,
}
def _create_response() -> HttpResponseBuilder:
return create_response_builder(
response_template=RESPONSE_TEMPLATE,
records_path=FieldPath("data"),
)
def _create_record(resource: str) -> RecordBuilder:
return create_record_builder(
response_template=RESOURCE_TO_TEMPLATE.get(resource),
records_path=FieldPath("data"),
record_id_path=FieldPath("id"),
record_cursor_path=FieldPath("created_at"),
)
class FullRefreshStreamTest(TestCase):
@HttpMocker()
def test_full_refresh_sync(self, http_mocker):
start_datetime = _NOW - timedelta(days=14)
config = {
"start_date": start_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
}
http_mocker.get(
_create_users_request().build(),
_create_response().with_record(record=_create_record("users")).with_record(record=_create_record("users")).build(),
)
source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("users", SyncMode.full_refresh)]))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("users"))
assert len(actual_messages.records) == 2
assert len(actual_messages.state_messages) == 1
validate_message_order([Type.RECORD, Type.RECORD, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "users"
assert actual_messages.state_messages[0].state.stream.stream_state == {"__ab_full_refresh_state_message": True}
@HttpMocker()
def test_full_refresh_with_slices(self, http_mocker):
start_datetime = _NOW - timedelta(days=14)
config = {
"start_date": start_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
}
http_mocker.get(
_create_dividers_request().with_category("dukes").build(),
_create_response().with_record(record=_create_record("dividers")).with_record(record=_create_record("dividers")).build(),
)
http_mocker.get(
_create_dividers_request().with_category("mentats").build(),
_create_response().with_record(record=_create_record("dividers")).with_record(record=_create_record("dividers")).build(),
)
source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("dividers", SyncMode.full_refresh)]))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("dividers"))
assert len(actual_messages.records) == 4
assert len(actual_messages.state_messages) == 1
validate_message_order([Type.RECORD, Type.RECORD, Type.RECORD, Type.RECORD, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "dividers"
assert actual_messages.state_messages[0].state.stream.stream_state == {"__ab_full_refresh_state_message": True}
@freezegun.freeze_time(_NOW)
class IncrementalStreamTest(TestCase):
@HttpMocker()
def test_incremental_sync(self, http_mocker):
start_datetime = _NOW - timedelta(days=14)
config = {
"start_date": start_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
}
last_record_date_0 = (start_datetime + timedelta(days=4)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_planets_request().with_start_date(start_datetime).with_end_date(start_datetime + timedelta(days=7)).build(),
_create_response().with_record(record=_create_record("planets").with_cursor(last_record_date_0)).with_record(record=_create_record("planets").with_cursor(last_record_date_0)).with_record(record=_create_record("planets").with_cursor(last_record_date_0)).build(),
)
last_record_date_1 = (_NOW - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_planets_request().with_start_date(start_datetime + timedelta(days=7)).with_end_date(_NOW).build(),
_create_response().with_record(record=_create_record("planets").with_cursor(last_record_date_1)).with_record(record=_create_record("planets").with_cursor(last_record_date_1)).build(),
)
source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("planets", SyncMode.incremental)]))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("planets"))
assert len(actual_messages.records) == 5
assert len(actual_messages.state_messages) == 2
validate_message_order([Type.RECORD, Type.RECORD, Type.RECORD, Type.STATE, Type.RECORD, Type.RECORD, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "planets"
assert actual_messages.state_messages[0].state.stream.stream_state == {"created_at": last_record_date_0}
assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "planets"
assert actual_messages.state_messages[1].state.stream.stream_state == {"created_at": last_record_date_1}
@HttpMocker()
def test_incremental_running_as_full_refresh(self, http_mocker):
start_datetime = _NOW - timedelta(days=14)
config = {
"start_date": start_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
}
last_record_date_0 = (start_datetime + timedelta(days=4)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_planets_request().with_start_date(start_datetime).with_end_date(start_datetime + timedelta(days=7)).build(),
_create_response().with_record(record=_create_record("planets").with_cursor(last_record_date_0)).with_record(record=_create_record("planets").with_cursor(last_record_date_0)).with_record(record=_create_record("planets").with_cursor(last_record_date_0)).build(),
)
last_record_date_1 = (_NOW - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_planets_request().with_start_date(start_datetime + timedelta(days=7)).with_end_date(_NOW).build(),
_create_response().with_record(record=_create_record("planets").with_cursor(last_record_date_1)).with_record(record=_create_record("planets").with_cursor(last_record_date_1)).build(),
)
source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("planets", SyncMode.full_refresh)]))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("planets"))
assert len(actual_messages.records) == 5
assert len(actual_messages.state_messages) == 1
validate_message_order([Type.RECORD, Type.RECORD, Type.RECORD, Type.RECORD, Type.RECORD, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "planets"
assert actual_messages.state_messages[0].state.stream.stream_state == {"created_at": last_record_date_1}
@HttpMocker()
def test_legacy_incremental_sync(self, http_mocker):
start_datetime = _NOW - timedelta(days=14)
config = {
"start_date": start_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
}
last_record_date_0 = (start_datetime + timedelta(days=4)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_legacies_request().with_start_date(start_datetime).with_end_date(start_datetime + timedelta(days=7)).build(),
_create_response().with_record(record=_create_record("legacies").with_cursor(last_record_date_0)).with_record(record=_create_record("legacies").with_cursor(last_record_date_0)).with_record(record=_create_record("legacies").with_cursor(last_record_date_0)).build(),
)
last_record_date_1 = (_NOW - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_legacies_request().with_start_date(start_datetime + timedelta(days=7)).with_end_date(_NOW).build(),
_create_response().with_record(record=_create_record("legacies").with_cursor(last_record_date_1)).with_record(record=_create_record("legacies").with_cursor(last_record_date_1)).build(),
)
source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("legacies", SyncMode.incremental)]))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("legacies"))
assert len(actual_messages.records) == 5
assert len(actual_messages.state_messages) == 2
validate_message_order([Type.RECORD, Type.RECORD, Type.RECORD, Type.STATE, Type.RECORD, Type.RECORD, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "legacies"
assert actual_messages.state_messages[0].state.stream.stream_state == {"created_at": last_record_date_0}
assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "legacies"
assert actual_messages.state_messages[1].state.stream.stream_state == {"created_at": last_record_date_1}
@freezegun.freeze_time(_NOW)
class MultipleStreamTest(TestCase):
@HttpMocker()
def test_incremental_and_full_refresh_streams(self, http_mocker):
start_datetime = _NOW - timedelta(days=14)
config = {
"start_date": start_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
}
# Mocks for users full refresh stream
http_mocker.get(
_create_users_request().build(),
_create_response().with_record(record=_create_record("users")).with_record(record=_create_record("users")).build(),
)
# Mocks for planets incremental stream
last_record_date_0 = (start_datetime + timedelta(days=4)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_planets_request().with_start_date(start_datetime).with_end_date(start_datetime + timedelta(days=7)).build(),
_create_response().with_record(record=_create_record("planets").with_cursor(last_record_date_0)).with_record(record=_create_record("planets").with_cursor(last_record_date_0)).with_record(record=_create_record("planets").with_cursor(last_record_date_0)).build(),
)
last_record_date_1 = (_NOW - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
http_mocker.get(
_create_planets_request().with_start_date(start_datetime + timedelta(days=7)).with_end_date(_NOW).build(),
_create_response().with_record(record=_create_record("planets").with_cursor(last_record_date_1)).with_record(record=_create_record("planets").with_cursor(last_record_date_1)).build(),
)
# Mocks for dividers full refresh stream
http_mocker.get(
_create_dividers_request().with_category("dukes").build(),
_create_response().with_record(record=_create_record("dividers")).with_record(record=_create_record("dividers")).build(),
)
http_mocker.get(
_create_dividers_request().with_category("mentats").build(),
_create_response().with_record(record=_create_record("dividers")).with_record(record=_create_record("dividers")).build(),
)
source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("users", SyncMode.full_refresh), ("planets", SyncMode.incremental), ("dividers", SyncMode.full_refresh)]))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("users"))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("planets"))
assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("dividers"))
assert len(actual_messages.records) == 11
assert len(actual_messages.state_messages) == 4
validate_message_order([
Type.RECORD,
Type.RECORD,
Type.STATE,
Type.RECORD,
Type.RECORD,
Type.RECORD,
Type.STATE,
Type.RECORD,
Type.RECORD,
Type.STATE,
Type.RECORD,
Type.RECORD,
Type.RECORD,
Type.RECORD,
Type.STATE
], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "users"
assert actual_messages.state_messages[0].state.stream.stream_state == {"__ab_full_refresh_state_message": True}
assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "planets"
assert actual_messages.state_messages[1].state.stream.stream_state == {"created_at": last_record_date_0}
assert actual_messages.state_messages[2].state.stream.stream_descriptor.name == "planets"
assert actual_messages.state_messages[2].state.stream.stream_state == {"created_at": last_record_date_1}
assert actual_messages.state_messages[3].state.stream.stream_descriptor.name == "dividers"
assert actual_messages.state_messages[3].state.stream.stream_state == {"__ab_full_refresh_state_message": True}
def emits_successful_sync_status_messages(status_messages: List[AirbyteStreamStatus]) -> bool:
return (len(status_messages) == 3 and status_messages[0] == AirbyteStreamStatus.STARTED
and status_messages[1] == AirbyteStreamStatus.RUNNING and status_messages[2] == AirbyteStreamStatus.COMPLETE)
def validate_message_order(expected_message_order: List[Type], messages: List[AirbyteMessage]):
if len(expected_message_order) != len(messages):
pytest.fail(f"Expected message order count {len(expected_message_order)} did not match actual messages {len(messages)}")
for i, message in enumerate(messages):
if message.type != expected_message_order[i]:
pytest.fail(f"At index {i} actual message type {message.type.name} did not match expected message type {expected_message_order[i].name}")

View File

@@ -73,11 +73,11 @@ test_incremental_stream_with_slice_boundaries_no_input_state = (
[
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
{"data": {"id": "2", "cursor_field": 1}, "stream": "stream1"},
{"stream1": {"cursor_field": 1}},
{"cursor_field": 1},
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
{"stream1": {"cursor_field": 2}},
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
{"cursor_field": 2},
{"cursor_field": 2}, # see Cursor.ensure_at_least_one_state_emitted
]
)
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
@@ -150,11 +150,11 @@ test_incremental_stream_with_slice_boundaries_with_legacy_state = (
[
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
{"data": {"id": "2", "cursor_field": 1}, "stream": "stream1"},
{"stream1": {"cursor_field": 1}},
{"cursor_field": 1},
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
{"stream1": {"cursor_field": 2}},
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
{"cursor_field": 2},
{"cursor_field": 2}, # see Cursor.ensure_at_least_one_state_emitted
]
)
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
@@ -237,11 +237,11 @@ test_incremental_stream_with_slice_boundaries_with_concurrent_state = (
[
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
{"data": {"id": "2", "cursor_field": 1}, "stream": "stream1"},
{"stream1": {"cursor_field": 1}},
{"cursor_field": 1},
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
{"stream1": {"cursor_field": 2}},
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
{"cursor_field": 2},
{"cursor_field": 2}, # see Cursor.ensure_at_least_one_state_emitted
]
)
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})

View File

@@ -357,11 +357,11 @@ test_incremental_stream_with_slice_boundaries = (
[
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
{"data": {"id": "2", "cursor_field": 1}, "stream": "stream1"},
{"stream1": {"cursor_field": 1}},
{"cursor_field": 1},
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
{"stream1": {"cursor_field": 2}},
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
{"cursor_field": 2},
{"cursor_field": 2}, # see Cursor.ensure_at_least_one_state_emitted
]
)
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
@@ -403,8 +403,8 @@ test_incremental_stream_without_slice_boundaries = (
[
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
{"data": {"id": "2", "cursor_field": 3}, "stream": "stream1"},
{"stream1": {"cursor_field": 3}},
{"stream1": {"cursor_field": 3}}, # see Cursor.ensure_at_least_one_state_emitted
{"cursor_field": 3},
{"cursor_field": 3}, # see Cursor.ensure_at_least_one_state_emitted
]
)
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})

View File

@@ -244,25 +244,14 @@ class StreamFacadeTest(unittest.TestCase):
assert actual_stream_data == expected_stream_data
def test_read_records_full_refresh(self):
def test_read_records(self):
expected_stream_data = [{"data": 1}, {"data": 2}]
records = [Record(data, "stream") for data in expected_stream_data]
partition = Mock()
partition.read.return_value = records
self._abstract_stream.generate_partitions.return_value = [partition]
actual_stream_data = list(self._facade.read_full_refresh(None, None, None))
assert actual_stream_data == expected_stream_data
def test_read_records_incremental(self):
expected_stream_data = [{"data": 1}, {"data": 2}]
records = [Record(data, "stream") for data in expected_stream_data]
partition = Mock()
partition.read.return_value = records
self._abstract_stream.generate_partitions.return_value = [partition]
actual_stream_data = list(self._facade.read_incremental(None, None, None, None, None, None, None))
actual_stream_data = list(self._facade.read(None, None, None, None, None, None))
assert actual_stream_data == expected_stream_data

View File

@@ -1,17 +1,34 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, Iterable, List, Mapping, Optional, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from unittest.mock import Mock
import pytest
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import (
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateType,
AirbyteStream,
AirbyteStreamState,
ConfiguredAirbyteStream,
DestinationSyncMode,
Level,
StreamDescriptor,
SyncMode,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.message import InMemoryMessageRepository
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, NoopCursor
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger
@@ -49,20 +66,66 @@ class _MockStream(Stream):
return {}
class MockConcurrentCursor(Cursor):
_state: MutableMapping[str, Any]
_message_repository: MessageRepository
def __init__(self, message_repository: MessageRepository):
self._message_repository = message_repository
self._state = {}
@property
def state(self) -> MutableMapping[str, Any]:
return self._state
def observe(self, record: Record) -> None:
partition = str(record.data.get("partition"))
timestamp = record.data.get("created_at")
self._state[partition] = {"created_at": timestamp}
def close_partition(self, partition: Partition) -> None:
self._message_repository.emit_message(
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name='__mock_stream', namespace=None),
stream_state=AirbyteStateBlob(**self._state),
)
),
)
)
def ensure_at_least_one_state_emitted(self) -> None:
pass
def _stream(slice_to_partition_mapping, slice_logger, logger, message_repository):
return _MockStream(slice_to_partition_mapping)
def _concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message_repository):
def _concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message_repository, cursor: Cursor = NoopCursor()):
stream = _stream(slice_to_partition_mapping, slice_logger, logger, message_repository)
source = Mock()
source._slice_logger = slice_logger
source.message_repository = message_repository
stream = StreamFacade.create_from_stream(stream, source, logger, _NO_STATE, NoopCursor())
stream = StreamFacade.create_from_stream(stream, source, logger, _NO_STATE, cursor)
stream.logger.setLevel(logger.level)
return stream
def _incremental_stream(slice_to_partition_mapping, slice_logger, logger, message_repository, timestamp):
stream = _stream(slice_to_partition_mapping, slice_logger, logger, message_repository)
stream.state = {"created_at": timestamp}
return stream
def _incremental_concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message_repository, cursor):
stream = _concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message_repository, cursor)
return stream
@pytest.mark.parametrize(
"constructor",
[
@@ -73,6 +136,8 @@ def _concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message
def test_full_refresh_read_a_single_slice_with_debug(constructor):
# This test verifies that a concurrent stream adapted from a Stream behaves the same as the Stream object.
# It is done by running the same test cases on both streams
configured_stream = ConfiguredAirbyteStream(stream=AirbyteStream(name="mock_stream", supported_sync_modes=[SyncMode.full_refresh], json_schema={}), sync_mode=SyncMode.full_refresh,destination_sync_mode=DestinationSyncMode.overwrite)
internal_config = InternalConfig()
records = [
{"id": 1, "partition": 1},
{"id": 2, "partition": 1},
@@ -82,6 +147,7 @@ def test_full_refresh_read_a_single_slice_with_debug(constructor):
logger = _mock_logger(True)
message_repository = InMemoryMessageRepository(Level.DEBUG)
stream = constructor(slice_to_partition, slice_logger, logger, message_repository)
state_manager = ConnectorStateManager(stream_instance_map={})
expected_records = [
AirbyteMessage(
@@ -94,7 +160,22 @@ def test_full_refresh_read_a_single_slice_with_debug(constructor):
*records,
]
actual_records = _read(stream, logger, slice_logger, message_repository)
# Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet
if constructor == _stream:
expected_records.append(
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name='__mock_stream', namespace=None),
stream_state=AirbyteStateBlob(__ab_full_refresh_state_message=True),
)
),
),
)
actual_records = _read(stream, configured_stream, logger, slice_logger, message_repository, state_manager, internal_config)
assert expected_records == actual_records
@@ -109,9 +190,12 @@ def test_full_refresh_read_a_single_slice_with_debug(constructor):
def test_full_refresh_read_a_single_slice(constructor):
# This test verifies that a concurrent stream adapted from a Stream behaves the same as the Stream object.
# It is done by running the same test cases on both streams
configured_stream = ConfiguredAirbyteStream(stream=AirbyteStream(name="mock_stream", supported_sync_modes=[SyncMode.full_refresh], json_schema={}), sync_mode=SyncMode.full_refresh,destination_sync_mode=DestinationSyncMode.overwrite)
internal_config = InternalConfig()
logger = _mock_logger()
slice_logger = DebugSliceLogger()
message_repository = InMemoryMessageRepository(Level.INFO)
state_manager = ConnectorStateManager(stream_instance_map={})
records = [
{"id": 1, "partition": 1},
@@ -122,7 +206,22 @@ def test_full_refresh_read_a_single_slice(constructor):
expected_records = [*records]
actual_records = _read(stream, logger, slice_logger, message_repository)
# Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet
if constructor == _stream:
expected_records.append(
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name='__mock_stream', namespace=None),
stream_state=AirbyteStateBlob(__ab_full_refresh_state_message=True),
)
),
),
)
actual_records = _read(stream, configured_stream, logger, slice_logger, message_repository, state_manager, internal_config)
assert expected_records == actual_records
@@ -137,9 +236,12 @@ def test_full_refresh_read_a_single_slice(constructor):
def test_full_refresh_read_a_two_slices(constructor):
# This test verifies that a concurrent stream adapted from a Stream behaves the same as the Stream object
# It is done by running the same test cases on both streams
configured_stream = ConfiguredAirbyteStream(stream=AirbyteStream(name="mock_stream", supported_sync_modes=[SyncMode.full_refresh], json_schema={}), sync_mode=SyncMode.full_refresh,destination_sync_mode=DestinationSyncMode.overwrite)
internal_config = InternalConfig()
logger = _mock_logger()
slice_logger = DebugSliceLogger()
message_repository = InMemoryMessageRepository(Level.INFO)
state_manager = ConnectorStateManager(stream_instance_map={})
records_partition_1 = [
{"id": 1, "partition": 1},
@@ -157,16 +259,111 @@ def test_full_refresh_read_a_two_slices(constructor):
*records_partition_2,
]
actual_records = _read(stream, logger, slice_logger, message_repository)
# Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet
if constructor == _stream:
expected_records.append(
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name='__mock_stream', namespace=None),
stream_state=AirbyteStateBlob(__ab_full_refresh_state_message=True),
)
),
),
)
actual_records = _read(stream, configured_stream, logger, slice_logger, message_repository, state_manager, internal_config)
for record in expected_records:
assert record in actual_records
assert len(expected_records) == len(actual_records)
def _read(stream, logger, slice_logger, message_repository):
def test_incremental_read_two_slices():
# This test verifies that a stream running in incremental mode emits state messages correctly
configured_stream = ConfiguredAirbyteStream(stream=AirbyteStream(name="mock_stream", supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], json_schema={}), sync_mode=SyncMode.incremental,destination_sync_mode=DestinationSyncMode.overwrite)
internal_config = InternalConfig()
logger = _mock_logger()
slice_logger = DebugSliceLogger()
message_repository = InMemoryMessageRepository(Level.INFO)
state_manager = ConnectorStateManager(stream_instance_map={})
timestamp = "1708899427"
records_partition_1 = [
{"id": 1, "partition": 1},
{"id": 2, "partition": 1},
]
records_partition_2 = [
{"id": 3, "partition": 2},
{"id": 4, "partition": 2},
]
slice_to_partition = {1: records_partition_1, 2: records_partition_2}
stream = _incremental_stream(slice_to_partition, slice_logger, logger, message_repository, timestamp)
expected_records = [
*records_partition_1,
_create_state_message("__mock_stream", {"created_at": timestamp}),
*records_partition_2,
_create_state_message("__mock_stream", {"created_at": timestamp})
]
actual_records = _read(stream, configured_stream, logger, slice_logger, message_repository, state_manager, internal_config)
for record in expected_records:
assert record in actual_records
assert len(expected_records) == len(actual_records)
def test_concurrent_incremental_read_two_slices():
# This test verifies that an incremental concurrent stream manages state correctly for multiple slices syncing concurrently
configured_stream = ConfiguredAirbyteStream(stream=AirbyteStream(name="mock_stream", supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], json_schema={}), sync_mode=SyncMode.incremental,destination_sync_mode=DestinationSyncMode.overwrite)
internal_config = InternalConfig()
logger = _mock_logger()
slice_logger = DebugSliceLogger()
message_repository = InMemoryMessageRepository(Level.INFO)
state_manager = ConnectorStateManager(stream_instance_map={})
slice_timestamp_1 = "1708850000"
slice_timestamp_2 = "1708950000"
cursor = MockConcurrentCursor(message_repository)
records_partition_1 = [
{"id": 1, "partition": 1, "created_at": "1708800000"},
{"id": 2, "partition": 1, "created_at": slice_timestamp_1},
]
records_partition_2 = [
{"id": 3, "partition": 2, "created_at": "1708900000"},
{"id": 4, "partition": 2, "created_at": slice_timestamp_2},
]
slice_to_partition = {1: records_partition_1, 2: records_partition_2}
stream = _incremental_concurrent_stream(slice_to_partition, slice_logger, logger, message_repository, cursor)
expected_records = [
*records_partition_1,
*records_partition_2,
]
expected_state = _create_state_message("__mock_stream", {"1": {"created_at": slice_timestamp_1}, "2": {"created_at": slice_timestamp_2}})
actual_records = _read(stream, configured_stream, logger, slice_logger, message_repository, state_manager, internal_config)
for record in expected_records:
assert record in actual_records
assert len(expected_records) == len(actual_records)
# We don't have a real source that reads from the message_repository for state, so we read from the queue directly to verify
# the cursor observed records correctly and updated partition states
mock_partition = Mock()
cursor.close_partition(mock_partition)
actual_state = [state for state in message_repository.consume_queue()]
assert len(actual_state) == 1
assert actual_state[0] == expected_state
def _read(stream, configured_stream, logger, slice_logger, message_repository, state_manager, internal_config):
records = []
for record in stream.read_full_refresh(_A_CURSOR_FIELD, logger, slice_logger):
for record in stream.read(configured_stream, logger, slice_logger, {}, state_manager, internal_config):
for message in message_repository.consume_queue():
records.append(message)
records.append(record)
@@ -192,3 +389,16 @@ def _mock_logger(enabled_for_debug=False):
logger.isEnabledFor.return_value = enabled_for_debug
logger.level = logging.DEBUG if enabled_for_debug else logging.INFO
return logger
def _create_state_message(stream: str, state: Mapping[str, Any]) -> AirbyteMessage:
return AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=stream, namespace=None),
stream_state=AirbyteStateBlob(**state),
)
),
)

View File

@@ -54,14 +54,12 @@ class MockSource(AbstractSource):
self,
check_lambda: Callable[[], Tuple[bool, Optional[Any]]] = None,
streams: List[Stream] = None,
per_stream: bool = True,
message_repository: MessageRepository = None,
exception_on_missing_stream: bool = True,
stop_sync_on_stream_failure: bool = False,
):
self._streams = streams
self.check_lambda = check_lambda
self.per_stream = per_stream
self.exception_on_missing_stream = exception_on_missing_stream
self._message_repository = message_repository
self._stop_sync_on_stream_failure = stop_sync_on_stream_failure
@@ -286,7 +284,7 @@ def test_read_stream_emits_repository_message_before_record(mocker, message_repo
stream = MockStream(name="my_stream")
mocker.patch.object(MockStream, "get_json_schema", return_value={})
mocker.patch.object(MockStream, "read_records", side_effect=[[{"a record": "a value"}, {"another record": "another value"}]])
message_repository.consume_queue.side_effect = [[message for message in [MESSAGE_FROM_REPOSITORY]], []]
message_repository.consume_queue.side_effect = [[message for message in [MESSAGE_FROM_REPOSITORY]], [], []]
source = MockSource(streams=[stream], message_repository=message_repository)
@@ -357,19 +355,16 @@ def _as_stream_status(stream: str, status: AirbyteStreamStatus) -> AirbyteMessag
return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
def _as_state(state_data: Dict[str, Any], stream_name: str = "", per_stream_state: Dict[str, Any] = None):
if per_stream_state:
return AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=stream_name), stream_state=AirbyteStateBlob.parse_obj(per_stream_state)
),
data=state_data,
def _as_state(stream_name: str = "", per_stream_state: Dict[str, Any] = None):
return AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=stream_name), stream_state=AirbyteStateBlob.parse_obj(per_stream_state)
),
)
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state_data))
),
)
def _as_error_trace(
@@ -410,8 +405,8 @@ def _fix_emitted_at(messages: List[AirbyteMessage]) -> List[AirbyteMessage]:
def test_valid_full_refresh_read_no_slices(mocker):
"""Tests that running a full refresh sync on streams which don't specify slices produces the expected AirbyteMessages"""
stream_output = [{"k1": "v1"}, {"k2": "v2"}]
s1 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s1")
s2 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s2")
s1 = MockStream([({"stream_state": {}, "sync_mode": SyncMode.full_refresh}, stream_output)], name="s1")
s2 = MockStream([({"stream_state": {}, "sync_mode": SyncMode.full_refresh}, stream_output)], name="s2")
mocker.patch.object(MockStream, "get_json_schema", return_value={})
@@ -428,10 +423,12 @@ def test_valid_full_refresh_read_no_slices(mocker):
_as_stream_status("s1", AirbyteStreamStatus.STARTED),
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
*_as_records("s1", stream_output),
_as_state("s1", {"__ab_full_refresh_state_message": True}),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
*_as_records("s2", stream_output),
_as_state("s2", {"__ab_full_refresh_state_message": True}),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -445,11 +442,11 @@ def test_valid_full_refresh_read_with_slices(mocker):
slices = [{"1": "1"}, {"2": "2"}]
# When attempting to sync a slice, just output that slice as a record
s1 = MockStream(
[({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
[({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
name="s1",
)
s2 = MockStream(
[({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
[({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
name="s2",
)
@@ -469,10 +466,12 @@ def test_valid_full_refresh_read_with_slices(mocker):
_as_stream_status("s1", AirbyteStreamStatus.STARTED),
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
*_as_records("s1", slices),
_as_state("s1", {"__ab_full_refresh_state_message": True}),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
*_as_records("s2", slices),
_as_state("s2", {"__ab_full_refresh_state_message": True}),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -482,6 +481,73 @@ def test_valid_full_refresh_read_with_slices(mocker):
assert expected == messages
def test_full_refresh_does_not_use_incoming_state(mocker):
"""Tests that running a full refresh sync does not use an incoming state message from the platform"""
slices = [{"1": "1"}, {"2": "2"}]
# When attempting to sync a slice, just output that slice as a record
s1 = MockStream(
[({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
name="s1",
)
s2 = MockStream(
[({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
name="s2",
)
def stream_slices_side_effect(stream_state: Mapping[str, Any], **kwargs) -> List[Mapping[str, Any]]:
if stream_state:
return slices[1:]
else:
return slices
mocker.patch.object(MockStream, "get_json_schema", return_value={})
mocker.patch.object(MockStream, "stream_slices", side_effect=stream_slices_side_effect)
state = [
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="s1"),
stream_state=AirbyteStateBlob.parse_obj({"created_at": "2024-01-31"}),
),
),
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="s2"),
stream_state=AirbyteStateBlob.parse_obj({"__ab_full_refresh_state_message": True}),
),
),
]
src = MockSource(streams=[s1, s2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(s1, SyncMode.full_refresh),
_configured_stream(s2, SyncMode.full_refresh),
]
)
expected = _fix_emitted_at(
[
_as_stream_status("s1", AirbyteStreamStatus.STARTED),
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
*_as_records("s1", slices),
_as_state("s1", {"__ab_full_refresh_state_message": True}),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
*_as_records("s2", slices),
_as_state("s2", {"__ab_full_refresh_state_message": True}),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state)))
assert messages == expected
@pytest.mark.parametrize(
"slices",
[[{"1": "1"}, {"2": "2"}], [{"date": datetime.date(year=2023, month=1, day=1)}, {"date": datetime.date(year=2023, month=1, day=1)}]],
@@ -491,7 +557,7 @@ def test_read_full_refresh_with_slices_sends_slice_messages(mocker, slices):
debug_logger = logging.getLogger("airbyte.debug")
debug_logger.setLevel(logging.DEBUG)
stream = MockStream(
[({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
[({"stream_state": {}, "sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
name="s1",
)
@@ -544,14 +610,7 @@ class TestIncrementalRead:
pytest.param(False, id="test_incoming_stream_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(True, id="test_source_emits_state_as_per_stream_format"),
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_with_state_attribute(self, mocker, use_legacy, per_stream_enabled):
def test_with_state_attribute(self, mocker, use_legacy):
"""Test correct state passing for the streams that have a state attribute"""
stream_output = [{"k1": "v1"}, {"k2": "v2"}]
old_state = {"cursor": "old_value"}
@@ -589,7 +648,7 @@ class TestIncrementalRead:
return_value=new_state_from_connector,
)
mocker.patch.object(MockStreamWithState, "get_json_schema", return_value={})
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -603,17 +662,13 @@ class TestIncrementalRead:
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
_as_record("s1", stream_output[0]),
_as_record("s1", stream_output[1]),
_as_state({"s1": new_state_from_connector}, "s1", new_state_from_connector)
if per_stream_enabled
else _as_state({"s1": new_state_from_connector}),
_as_state("s1", new_state_from_connector),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
_as_record("s2", stream_output[0]),
_as_record("s2", stream_output[1]),
_as_state({"s1": new_state_from_connector, "s2": new_state_from_connector}, "s2", new_state_from_connector)
if per_stream_enabled
else _as_state({"s1": new_state_from_connector, "s2": new_state_from_connector}),
_as_state("s2", new_state_from_connector),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -633,14 +688,7 @@ class TestIncrementalRead:
pytest.param(False, id="test_incoming_stream_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(True, id="test_source_emits_state_as_per_stream_format"),
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_with_checkpoint_interval(self, mocker, use_legacy, per_stream_enabled):
def test_with_checkpoint_interval(self, mocker, use_legacy):
"""Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message
after reading N records within a stream.
"""
@@ -670,7 +718,7 @@ class TestIncrementalRead:
return_value=1,
)
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -683,18 +731,18 @@ class TestIncrementalRead:
_as_stream_status("s1", AirbyteStreamStatus.STARTED),
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
_as_record("s1", stream_output[0]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_record("s1", stream_output[1]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_state("s1", state),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
_as_record("s2", stream_output[0]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_record("s2", stream_output[1]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_state("s2", state),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -709,14 +757,7 @@ class TestIncrementalRead:
pytest.param(False, id="test_incoming_stream_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(True, id="test_source_emits_state_as_per_stream_format"),
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_with_no_interval(self, mocker, use_legacy, per_stream_enabled):
def test_with_no_interval(self, mocker, use_legacy):
"""Tests that an incremental read which doesn't specify a checkpoint interval outputs
a STATE message only after fully reading the stream and does not output any STATE messages during syncing the stream.
"""
@@ -739,7 +780,7 @@ class TestIncrementalRead:
mocker.patch.object(MockStream, "supports_incremental", return_value=True)
mocker.patch.object(MockStream, "get_json_schema", return_value={})
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -752,12 +793,12 @@ class TestIncrementalRead:
_as_stream_status("s1", AirbyteStreamStatus.STARTED),
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
*_as_records("s1", stream_output),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
*_as_records("s2", stream_output),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -773,14 +814,7 @@ class TestIncrementalRead:
pytest.param(False, id="test_incoming_stream_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(True, id="test_source_emits_state_as_per_stream_format"),
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_with_slices(self, mocker, use_legacy, per_stream_enabled):
def test_with_slices(self, mocker, use_legacy):
"""Tests that an incremental read which uses slices outputs each record in the slice followed by a STATE message, for each slice"""
if use_legacy:
input_state = defaultdict(dict)
@@ -823,7 +857,7 @@ class TestIncrementalRead:
mocker.patch.object(MockStream, "get_json_schema", return_value={})
mocker.patch.object(MockStream, "stream_slices", return_value=slices)
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -837,19 +871,19 @@ class TestIncrementalRead:
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
# stream 1 slice 1
*_as_records("s1", stream_output),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
# stream 1 slice 2
*_as_records("s1", stream_output),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
# stream 2 slice 1
*_as_records("s2", stream_output),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
# stream 2 slice 2
*_as_records("s2", stream_output),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -865,15 +899,8 @@ class TestIncrementalRead:
pytest.param(False, id="test_incoming_stream_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(True, id="test_source_emits_state_as_per_stream_format"),
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize("slices", [pytest.param([], id="test_slices_as_list"), pytest.param(iter([]), id="test_slices_as_iterator")])
def test_no_slices(self, mocker, use_legacy, per_stream_enabled, slices):
def test_no_slices(self, mocker, use_legacy, slices):
"""
Tests that an incremental read returns at least one state messages even if no records were read:
1. outputs a state message after reading the entire stream
@@ -926,7 +953,7 @@ class TestIncrementalRead:
return_value=2,
)
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -937,10 +964,10 @@ class TestIncrementalRead:
expected = _fix_emitted_at(
[
_as_stream_status("s1", AirbyteStreamStatus.STARTED),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -956,14 +983,7 @@ class TestIncrementalRead:
pytest.param(False, id="test_incoming_stream_state_as_per_stream_format"),
],
)
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(True, id="test_source_emits_state_as_per_stream_format"),
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_with_slices_and_interval(self, mocker, use_legacy, per_stream_enabled):
def test_with_slices_and_interval(self, mocker, use_legacy):
"""
Tests that an incremental read which uses slices and a checkpoint interval:
1. outputs all records
@@ -1016,7 +1036,7 @@ class TestIncrementalRead:
return_value=2,
)
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -1031,32 +1051,32 @@ class TestIncrementalRead:
_as_stream_status("s1", AirbyteStreamStatus.RUNNING),
_as_record("s1", stream_output[0]),
_as_record("s1", stream_output[1]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_record("s1", stream_output[2]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
# stream 1 slice 2
_as_record("s1", stream_output[0]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_record("s1", stream_output[1]),
_as_record("s1", stream_output[2]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_state("s1", state),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
# stream 2 slice 1
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
_as_stream_status("s2", AirbyteStreamStatus.RUNNING),
_as_record("s2", stream_output[0]),
_as_record("s2", stream_output[1]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_record("s2", stream_output[2]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
# stream 2 slice 2
_as_record("s2", stream_output[0]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_record("s2", stream_output[1]),
_as_record("s2", stream_output[2]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_state("s2", state),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -1065,13 +1085,7 @@ class TestIncrementalRead:
assert messages == expected
@pytest.mark.parametrize(
"per_stream_enabled",
[
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_emit_non_records(self, mocker, per_stream_enabled):
def test_emit_non_records(self, mocker):
"""
Tests that an incremental read which uses slices and a checkpoint interval:
1. outputs all records
@@ -1129,7 +1143,7 @@ class TestIncrementalRead:
return_value=2,
)
src = MockSource(streams=[stream_1, stream_2], per_stream=per_stream_enabled)
src = MockSource(streams=[stream_1, stream_2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(stream_1, SyncMode.incremental),
@@ -1145,17 +1159,17 @@ class TestIncrementalRead:
stream_data_to_airbyte_message("s1", stream_output[0]),
stream_data_to_airbyte_message("s1", stream_output[1]),
stream_data_to_airbyte_message("s1", stream_output[2]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
stream_data_to_airbyte_message("s1", stream_output[3]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
# stream 1 slice 2
stream_data_to_airbyte_message("s1", stream_output[0]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
stream_data_to_airbyte_message("s1", stream_output[1]),
stream_data_to_airbyte_message("s1", stream_output[2]),
stream_data_to_airbyte_message("s1", stream_output[3]),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}),
_as_state("s1", state),
_as_state("s1", state),
_as_stream_status("s1", AirbyteStreamStatus.COMPLETE),
# stream 2 slice 1
_as_stream_status("s2", AirbyteStreamStatus.STARTED),
@@ -1163,17 +1177,17 @@ class TestIncrementalRead:
stream_data_to_airbyte_message("s2", stream_output[0]),
stream_data_to_airbyte_message("s2", stream_output[1]),
stream_data_to_airbyte_message("s2", stream_output[2]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
stream_data_to_airbyte_message("s2", stream_output[3]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
# stream 2 slice 2
stream_data_to_airbyte_message("s2", stream_output[0]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
stream_data_to_airbyte_message("s2", stream_output[1]),
stream_data_to_airbyte_message("s2", stream_output[2]),
stream_data_to_airbyte_message("s2", stream_output[3]),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}),
_as_state("s2", state),
_as_state("s2", state),
_as_stream_status("s2", AirbyteStreamStatus.COMPLETE),
]
)
@@ -1200,14 +1214,12 @@ def test_checkpoint_state_from_stream_instance():
# The stream_state passed to checkpoint_state() should be ignored since stream implements state function
teams_stream.state = {"updated_at": "2022-09-11"}
actual_message = teams_stream._checkpoint_state({"ignored": "state"}, state_manager, True)
assert actual_message == _as_state({"teams": {"updated_at": "2022-09-11"}}, "teams", {"updated_at": "2022-09-11"})
actual_message = teams_stream._checkpoint_state({"ignored": "state"}, state_manager)
assert actual_message == _as_state("teams", {"updated_at": "2022-09-11"})
# The stream_state passed to checkpoint_state() should be used since the stream does not implement state function
actual_message = managers_stream._checkpoint_state({"updated": "expected_here"}, state_manager, True)
assert actual_message == _as_state(
{"teams": {"updated_at": "2022-09-11"}, "managers": {"updated": "expected_here"}}, "managers", {"updated": "expected_here"}
)
actual_message = managers_stream._checkpoint_state({"updated": "expected_here"}, state_manager)
assert actual_message == _as_state("managers", {"updated": "expected_here"})
@pytest.mark.parametrize(
@@ -1382,9 +1394,9 @@ def test_continue_sync_with_failed_streams_with_override_false(mocker):
the sync when one stream fails with an error.
"""
stream_output = [{"k1": "v1"}, {"k2": "v2"}]
s1 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s1")
s1 = MockStream([({"stream_state": {}, "sync_mode": SyncMode.full_refresh}, stream_output)], name="s1")
s2 = StreamRaisesException(AirbyteTracedException(message="I was born only to crash like Icarus"))
s3 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s3")
s3 = MockStream([({"stream_state": {}, "sync_mode": SyncMode.full_refresh}, stream_output)], name="s3")
mocker.patch.object(MockStream, "get_json_schema", return_value={})
mocker.patch.object(StreamRaisesException, "get_json_schema", return_value={})

View File

@@ -284,61 +284,6 @@ def test_get_stream_state(input_state, stream_name, namespace, expected_state):
assert actual_state == expected_state
@pytest.mark.parametrize(
"input_state, expected_legacy_state, expected_error",
[
pytest.param(
[AirbyteStateMessage(type=AirbyteStateType.LEGACY, data={"actresses": {"id": "seehorn_rhea"}})],
{"actresses": {"id": "seehorn_rhea"}},
does_not_raise(),
id="test_get_legacy_legacy_state_message",
),
pytest.param(
[
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="actresses", namespace="public"),
stream_state=AirbyteStateBlob.parse_obj({"id": "seehorn_rhea"}),
),
)
],
{"actresses": {"id": "seehorn_rhea"}},
does_not_raise(),
id="test_get_legacy_from_stream_state",
),
pytest.param(
{
"actors": {"created_at": "1962-10-22"},
"actresses": {"id": "seehorn_rhea"},
},
{"actors": {"created_at": "1962-10-22"}, "actresses": {"id": "seehorn_rhea"}},
does_not_raise(),
id="test_get_legacy_from_legacy_state_blob",
),
pytest.param(
[
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="actresses", namespace="public"),
stream_state=None,
),
)
],
{"actresses": {}},
does_not_raise(),
id="test_get_legacy_from_stream_state",
),
],
)
def test_get_legacy_state(input_state, expected_legacy_state, expected_error):
with expected_error:
state_manager = ConnectorStateManager({}, input_state)
actual_legacy_state = state_manager._get_legacy_state()
assert actual_legacy_state == expected_legacy_state
def test_get_state_returns_deep_copy():
input_state = [
AirbyteStateMessage(
@@ -422,11 +367,10 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
assert state_manager.per_stream_states[
HashableStreamDescriptor(name=update_name, namespace=update_namespace)
] == AirbyteStateBlob.parse_obj(update_value)
assert state_manager._get_legacy_state() == expected_legacy_state
@pytest.mark.parametrize(
"start_state, update_name, update_namespace, send_per_stream, expected_state_message",
"start_state, update_name, update_namespace, expected_state_message",
[
pytest.param(
[
@@ -447,7 +391,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
],
"episodes",
"public",
True,
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
@@ -456,7 +399,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
stream_descriptor=StreamDescriptor(name="episodes", namespace="public"),
stream_state=AirbyteStateBlob.parse_obj({"created_at": "2022_05_22"}),
),
data={"episodes": {"created_at": "2022_05_22"}, "seasons": {"id": 1}},
),
),
id="test_emit_state_message_with_stream_and_legacy",
@@ -473,7 +415,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
],
"episodes",
"public",
True,
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
@@ -482,7 +423,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
stream_descriptor=StreamDescriptor(name="episodes", namespace="public"),
stream_state=AirbyteStateBlob(),
),
data={"episodes": {}},
),
),
id="test_always_emit_message_with_stream_state_blob",
@@ -499,7 +439,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
],
"missing",
"public",
True,
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
@@ -507,7 +446,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="missing", namespace="public"), stream_state=AirbyteStateBlob()
),
data={"episodes": {"id": 507}},
),
),
id="test_emit_state_nonexistent_stream_name",
@@ -524,7 +462,6 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
],
"episodes",
"nonexistent",
True,
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
@@ -532,72 +469,14 @@ def test_update_state_for_stream(start_state, update_name, update_namespace, upd
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="episodes", namespace="nonexistent"), stream_state=AirbyteStateBlob()
),
data={"episodes": {"id": 507}},
),
),
id="test_emit_state_wrong_namespace",
),
pytest.param(
[
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="episodes", namespace=None),
stream_state=AirbyteStateBlob.parse_obj({"created_at": "2022_05_22"}),
),
),
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="seasons", namespace=None),
stream_state=AirbyteStateBlob.parse_obj({"id": 1}),
),
),
],
"episodes",
"",
False,
AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
data={"episodes": {"created_at": "2022_05_22"}, "seasons": {"id": 1}},
),
),
id="test_emit_legacy_state_format",
),
],
)
def test_create_state_message(start_state, update_name, update_namespace, send_per_stream, expected_state_message):
def test_create_state_message(start_state, update_name, update_namespace, expected_state_message):
state_manager = ConnectorStateManager({}, start_state)
actual_state_message = state_manager.create_state_message(
stream_name=update_name, namespace=update_namespace, send_per_stream_state=send_per_stream
)
actual_state_message = state_manager.create_state_message(stream_name=update_name, namespace=update_namespace)
assert actual_state_message == expected_state_message
def test_do_not_set_stream_descriptor_namespace_when_none():
"""
This is a very specific test to ensure that the None value is not set and emitted back to the platform for namespace.
The platform performs validation on the state message sent by the connector and namespace must be a string or not
included at all. The None value registers as null by the platform which is not valid input. We can verify that fields
on a pydantic model are not defined using exclude_unset parameter.
"""
expected_stream_state_descriptor = {"name": "episodes"}
state_manager = ConnectorStateManager(
{},
[
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="episodes"),
stream_state=None,
),
),
],
)
actual_state_message = state_manager.create_state_message(stream_name="episodes", namespace=None, send_per_stream_state=True)
assert actual_state_message.state.stream.stream_descriptor.dict(exclude_unset=True) == expected_stream_state_descriptor

View File

@@ -365,8 +365,8 @@ def test_internal_config(abstract_source, catalog):
# Test with empty config
logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}")
records = [r for r in abstract_source.read(logger=logger, config={}, catalog=catalog, state={})]
# 3 for http stream, 3 for non http stream and 3 for stream status messages for each stream (2x)
assert len(records) == 3 + 3 + 3 + 3
# 3 for http stream, 3 for non http stream, 1 for state message for each stream (2x) and 3 for stream status messages for each stream (2x)
assert len(records) == 3 + 3 + 1 + 1 + 3 + 3
assert http_stream.read_records.called
assert non_http_stream.read_records.called
# Make sure page_size havent been set
@@ -375,21 +375,21 @@ def test_internal_config(abstract_source, catalog):
# Test with records limit set to 1
internal_config = {"some_config": 100, "_limit": 1}
records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})]
# 1 from http stream + 1 from non http stream and 3 for stream status messages for each stream (2x)
assert len(records) == 1 + 1 + 3 + 3
# 1 from http stream + 1 from non http stream, 1 for state message for each stream (2x) and 3 for stream status messages for each stream (2x)
assert len(records) == 1 + 1 + 1 + 1 + 3 + 3
assert "_limit" not in abstract_source.streams_config
assert "some_config" in abstract_source.streams_config
# Test with records limit set to number that exceeds expceted records
internal_config = {"some_config": 100, "_limit": 20}
records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})]
assert len(records) == 3 + 3 + 3 + 3
assert len(records) == 3 + 3 + 1 + 1 + 3 + 3
# Check if page_size paramter is set to http instance only
internal_config = {"some_config": 100, "_page_size": 2}
records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})]
assert "_page_size" not in abstract_source.streams_config
assert "some_config" in abstract_source.streams_config
assert len(records) == 3 + 3 + 3 + 3
assert len(records) == 3 + 3 + 1 + 1 + 3 + 3
assert http_stream.page_size == 2
# Make sure page_size havent been set for non http streams
assert not non_http_stream.page_size
@@ -403,6 +403,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog):
SLICE_DEBUG_LOG_COUNT = 1
FULL_RECORDS_NUMBER = 3
TRACE_STATUS_COUNT = 3
STATE_COUNT = 1
streams = abstract_source.streams(None)
http_stream = streams[0]
http_stream.read_records.return_value = [{}] * FULL_RECORDS_NUMBER
@@ -410,7 +411,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog):
catalog.streams[0].sync_mode = SyncMode.full_refresh
records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})]
assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT
assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT + STATE_COUNT
logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list]
# Check if log line matches number of limit
read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")]
@@ -440,6 +441,7 @@ SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}}
def test_source_config_no_transform(mocker, abstract_source, catalog):
SLICE_DEBUG_LOG_COUNT = 1
TRACE_STATUS_COUNT = 3
STATE_COUNT = 1
logger_mock = mocker.MagicMock()
logger_mock.level = logging.DEBUG
streams = abstract_source.streams(None)
@@ -447,7 +449,7 @@ def test_source_config_no_transform(mocker, abstract_source, catalog):
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
http_stream.read_records.return_value, non_http_stream.read_records.return_value = [[{"value": 23}] * 5] * 2
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT)
assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT + STATE_COUNT)
assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": 23}] * 2 * 5
assert http_stream.get_json_schema.call_count == 5
assert non_http_stream.get_json_schema.call_count == 5
@@ -458,6 +460,7 @@ def test_source_config_transform(mocker, abstract_source, catalog):
logger_mock.level = logging.DEBUG
SLICE_DEBUG_LOG_COUNT = 2
TRACE_STATUS_COUNT = 6
STATE_COUNT = 2
streams = abstract_source.streams(None)
http_stream, non_http_stream = streams
http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
@@ -465,7 +468,7 @@ def test_source_config_transform(mocker, abstract_source, catalog):
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}]
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT
assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT + STATE_COUNT
assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}] * 2
@@ -474,13 +477,14 @@ def test_source_config_transform_and_no_transform(mocker, abstract_source, catal
logger_mock.level = logging.DEBUG
SLICE_DEBUG_LOG_COUNT = 2
TRACE_STATUS_COUNT = 6
STATE_COUNT = 2
streams = abstract_source.streams(None)
http_stream, non_http_stream = streams
http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}]
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT
assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT + STATE_COUNT
assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}, {"value": 23}]
@@ -526,8 +530,8 @@ def test_read_default_http_availability_strategy_stream_available(catalog, mocke
source = MockAbstractSource(streams=streams)
logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}")
records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})]
# 3 for http stream, 3 for non http stream and 3 for stream status messages for each stream (2x)
assert len(records) == 3 + 3 + 3 + 3
# 3 for http stream, 3 for non http stream, 1 for state message for each stream (2x) and 3 for stream status messages for each stream (2x)
assert len(records) == 3 + 3 + 1 + 1 + 3 + 3
assert http_stream.read_records.called
assert non_http_stream.read_records.called
@@ -584,8 +588,8 @@ def test_read_default_http_availability_strategy_stream_unavailable(catalog, moc
with caplog.at_level(logging.WARNING):
records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})]
# 0 for http stream, 3 for non http stream and 3 status trace messages
assert len(records) == 0 + 3 + 3
# 0 for http stream, 3 for non http stream, 1 for non http stream state message and 3 status trace messages
assert len(records) == 0 + 3 + 1 + 3
assert non_http_stream.read_records.called
expected_logs = [
f"Skipped syncing stream '{http_stream.name}' because it was unavailable.",