1
0
mirror of synced 2025-12-23 21:03:15 -05:00

CAT: fix incremental by running tests per stream (#36814)

This commit is contained in:
Roman Yermilov [GL]
2024-06-03 13:55:11 +02:00
committed by GitHub
parent e64d3b597a
commit 1f325ec6e6
2 changed files with 330 additions and 383 deletions

View File

@@ -4,16 +4,26 @@
import json
from pathlib import Path
from typing import Any, Dict, List, Mapping, MutableMapping, Tuple, Union
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
import pytest
from airbyte_protocol.models import AirbyteMessage, AirbyteStateMessage, AirbyteStateType, ConfiguredAirbyteCatalog, SyncMode, Type
from airbyte_protocol.models import (
AirbyteMessage,
AirbyteStateMessage,
AirbyteStateStats,
AirbyteStateType,
ConfiguredAirbyteCatalog,
SyncMode,
Type,
)
from connector_acceptance_test import BaseTest
from connector_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig
from connector_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, incremental_only_catalog
from connector_acceptance_test.utils.timeouts import TWENTY_MINUTES
from deepdiff import DeepDiff
MIN_BATCHES_TO_TEST: int = 5
@pytest.fixture(name="future_state_configuration")
def future_state_configuration_fixture(inputs, base_path, test_strictness_level) -> Tuple[Path, List[EmptyStreamConfiguration]]:
@@ -170,70 +180,54 @@ class TestIncremental(BaseTest):
pytest.skip("Skipping new incremental test based on acceptance-test-config.yml")
return
output_1 = await docker_runner.call_read(connector_config, configured_catalog_for_incremental)
records_1 = filter_output(output_1, type_=Type.RECORD)
states_1 = filter_output(output_1, type_=Type.STATE)
for stream in configured_catalog_for_incremental.streams:
configured_catalog_for_incremental_per_stream = ConfiguredAirbyteCatalog(streams=[stream])
# We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up
unique_state_messages = [message for index, message in enumerate(states_1) if message not in states_1[:index]]
output_1 = await docker_runner.call_read(connector_config, configured_catalog_for_incremental_per_stream)
# Important!
# There is only a small subset of assertions we can make
# in the absense of enforcing that all connectors return 3 or more state messages
# during the first read.
# To learn more: https://github.com/airbytehq/airbyte/issues/29926
if len(unique_state_messages) < 3:
pytest.skip("Skipping test because there are not enough state messages to test with")
return
assert records_1, "First Read should produce at least one record"
# For legacy state format, the final state message contains the final state of all streams. For per-stream state format,
# the complete final state of streams must be assembled by going through all prior state messages received
is_per_stream = is_per_stream_state(states_1[-1])
# To avoid spamming APIs we only test a fraction of batches (10%) and enforce a minimum of 10 tested
min_batches_to_test = 5
sample_rate = len(unique_state_messages) // min_batches_to_test
mutating_stream_name_to_per_stream_state = dict()
for idx, state_message in enumerate(unique_state_messages):
assert state_message.type == Type.STATE
# if first state message, skip
# this is because we cannot assert if the first state message will result in new records
# as in this case it is possible for a connector to return an empty state message when it first starts.
# e.g. if the connector decides it wants to let the caller know that it has started with an empty state.
if idx == 0:
records_1 = filter_output(output_1, type_=Type.RECORD)
# If the output of a full read is empty, there is no reason to iterate over its state.
# So, reading from any checkpoint of an empty stream will also produce nothing.
if len(records_1) == 0:
continue
# if last state message, skip
# this is because we cannot assert if the last state message will result in new records
# as in this case it is possible for a connector to return a previous state message.
# e.g. if the connector is using pagination and the last page is only partially full
if idx == len(unique_state_messages) - 1:
states_1 = filter_output(output_1, type_=Type.STATE)
# To learn more: https://github.com/airbytehq/airbyte/issues/29926
if len(states_1) == 0:
continue
# if batching required, and not a sample, skip
if len(unique_state_messages) >= min_batches_to_test and idx % sample_rate != 0:
continue
states_with_expected_record_count = self._state_messages_selector(states_1)
if not states_with_expected_record_count:
pytest.fail(
"Unable to test because there is no suitable state checkpoint, likely due to a zero record count in the states."
)
state_input, mutating_stream_name_to_per_stream_state = self.get_next_state_input(
state_message, mutating_stream_name_to_per_stream_state, is_per_stream
)
mutating_stream_name_to_per_stream_state = dict()
output_N = await docker_runner.call_read_with_state(connector_config, configured_catalog_for_incremental, state=state_input)
records_N = filter_output(output_N, type_=Type.RECORD)
assert (
records_N
), f"Read {idx + 2} of {len(unique_state_messages)} should produce at least one record.\n\n state: {state_input} \n\n records_{idx + 2}: {records_N}"
for idx, state_message_data in enumerate(states_with_expected_record_count):
state_message, expected_records_count = state_message_data
assert state_message.type == Type.STATE
diff = naive_diff_records(records_1, records_N)
assert (
diff
), f"Records for subsequent reads with new state should be different.\n\n records_1: {records_1} \n\n state: {state_input} \n\n records_{idx + 2}: {records_N} \n\n diff: {diff}"
state_input, mutating_stream_name_to_per_stream_state = self.get_next_state_input(
state_message, mutating_stream_name_to_per_stream_state
)
output_N = await docker_runner.call_read_with_state(
connector_config, configured_catalog_for_incremental_per_stream, state=state_input
)
records_N = filter_output(output_N, type_=Type.RECORD)
assert (
# We assume that the output may be empty when we read the latest state, or it must produce some data if we are in the middle of our progression
len(records_N)
>= expected_records_count
), f"Read {idx + 1} of {len(states_with_expected_record_count)} should produce at least one record.\n\n state: {state_input} \n\n records_{idx + 1}: {records_N}"
diff = naive_diff_records(records_1, records_N)
assert (
diff
), f"Records for subsequent reads with new state should be different.\n\n records_1: {records_1} \n\n state: {state_input} \n\n records_{idx + 1}: {records_N} \n\n diff: {diff}"
async def test_state_with_abnormally_large_values(
self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner
@@ -249,25 +243,116 @@ class TestIncremental(BaseTest):
assert states, "The sync should produce at least one STATE message"
def get_next_state_input(
self,
state_message: AirbyteStateMessage,
stream_name_to_per_stream_state: MutableMapping,
is_per_stream,
self, state_message: AirbyteStateMessage, stream_name_to_per_stream_state: MutableMapping
) -> Tuple[Union[List[MutableMapping], MutableMapping], MutableMapping]:
if is_per_stream:
# Including all the latest state values from previous batches, update the combined stream state
# with the current batch's stream state and then use it in the following read() request
current_state = state_message.state
if current_state and current_state.type == AirbyteStateType.STREAM:
per_stream = current_state.stream
if per_stream.stream_state:
stream_name_to_per_stream_state[per_stream.stream_descriptor.name] = (
per_stream.stream_state.dict() if per_stream.stream_state else {}
)
state_input = [
{"type": "STREAM", "stream": {"stream_descriptor": {"name": stream_name}, "stream_state": stream_state}}
for stream_name, stream_state in stream_name_to_per_stream_state.items()
]
return state_input, stream_name_to_per_stream_state
else:
return state_message.state.data, state_message.state.data
# Including all the latest state values from previous batches, update the combined stream state
# with the current batch's stream state and then use it in the following read() request
current_state = state_message.state
if current_state and current_state.type == AirbyteStateType.STREAM:
per_stream = current_state.stream
if per_stream.stream_state:
stream_name_to_per_stream_state[per_stream.stream_descriptor.name] = (
per_stream.stream_state.dict() if per_stream.stream_state else {}
)
state_input = [
{"type": "STREAM", "stream": {"stream_descriptor": {"name": stream_name}, "stream_state": stream_state}}
for stream_name, stream_state in stream_name_to_per_stream_state.items()
]
return state_input, stream_name_to_per_stream_state
@staticmethod
def _get_state(airbyte_message: AirbyteMessage) -> AirbyteStateMessage:
if not airbyte_message.state.stream:
return airbyte_message.state
return airbyte_message.state.stream.stream_state
@staticmethod
def _get_record_count(airbyte_message: AirbyteMessage) -> float:
return airbyte_message.state.sourceStats.recordCount
def _get_unique_state_messages_with_record_count(self, states: List[AirbyteMessage]) -> List[Tuple[AirbyteMessage, float]]:
"""
Validates a list of state messages to ensure that consecutive messages with the same stream state are represented by only the first message, while subsequent duplicates are ignored.
"""
if len(states) <= 1:
return [(state, 0.0) for state in states if self._get_record_count(state)]
current_idx = 0
unique_state_messages = []
# Iterate through the list of state messages
while current_idx < len(states) - 1:
next_idx = current_idx + 1
# Check if consecutive messages have the same stream state
while self._get_state(states[current_idx]) == self._get_state(states[next_idx]) and next_idx < len(states) - 1:
next_idx += 1
states[current_idx].state.sourceStats = AirbyteStateStats(
recordCount=sum(map(self._get_record_count, states[current_idx:next_idx]))
)
# Append the first message with a unique stream state to the result list
unique_state_messages.append(states[current_idx])
# If the last message has a different stream state than the previous one, append it to the result list
if next_idx == len(states) - 1 and self._get_state(states[current_idx]) != self._get_state(states[next_idx]):
unique_state_messages.append(states[next_idx])
current_idx = next_idx
# Drop all states with a record count of 0.0
unique_non_zero_state_messages = list(filter(self._get_record_count, unique_state_messages))
total_record_count = sum(map(self._get_record_count, unique_non_zero_state_messages))
# Calculates the expected record count per state based on the total record count and distribution across states.
# The expected record count is the number of records we expect to receive when applying a specific state checkpoint.
unique_non_zero_state_messages_with_record_count = zip(
unique_non_zero_state_messages,
[
total_record_count - sum(map(self._get_record_count, unique_non_zero_state_messages[: idx + 1]))
for idx in range(len(unique_non_zero_state_messages))
],
)
return list(unique_non_zero_state_messages_with_record_count)
def _states_with_expected_record_count_batch_selector(
self, unique_state_messages_with_record_count: List[Tuple[AirbyteMessage, float]]
) -> List[Tuple[AirbyteMessage, float]]:
# Important!
# There is only a small subset of assertions we can make
# in the absense of enforcing that all connectors return 3 or more state messages
# during the first read.
if len(unique_state_messages_with_record_count) < 3:
return unique_state_messages_with_record_count[-1:]
# To avoid spamming APIs we only test a fraction of batches (4 or 5 states by default)
sample_rate = (len(unique_state_messages_with_record_count) // MIN_BATCHES_TO_TEST) or 1
states_with_expected_record_count_batch = []
for idx, state_message_data in enumerate(unique_state_messages_with_record_count):
# if first state message, skip
# this is because we cannot assert if the first state message will result in new records
# as in this case it is possible for a connector to return an empty state message when it first starts.
# e.g. if the connector decides it wants to let the caller know that it has started with an empty state.
if idx == 0:
continue
# if batching required, and not a sample, skip
if idx % sample_rate != 0:
continue
# if last state message, skip
# this is because we cannot assert if the last state message will result in new records
# as in this case it is possible for a connector to return a previous state message.
# e.g. if the connector is using pagination and the last page is only partially full
if idx == len(unique_state_messages_with_record_count) - 1:
continue
states_with_expected_record_count_batch.append(state_message_data)
return states_with_expected_record_count_batch
def _state_messages_selector(self, state_messages: List[AirbyteMessage]) -> List[Tuple[AirbyteMessage, float]]:
unique_state_messages_with_record_count = self._get_unique_state_messages_with_record_count(state_messages)
return self._states_with_expected_record_count_batch_selector(unique_state_messages_with_record_count)

View File

@@ -3,19 +3,19 @@
#
import json
import operator
from contextlib import nullcontext as does_not_raise
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from unittest.mock import MagicMock, patch
import pendulum
import pytest
from airbyte_protocol.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateStats,
AirbyteStateType,
AirbyteStream,
AirbyteStreamState,
@@ -49,15 +49,21 @@ def build_state_message(state: dict) -> AirbyteMessage:
def build_per_stream_state_message(
descriptor: StreamDescriptor, stream_state: Optional[dict[str, Any]], data: Optional[dict[str, Any]] = None
descriptor: StreamDescriptor, stream_state: Optional[dict[str, Any]], data: Optional[dict[str, Any]] = None, source_stats: Optional[dict[str, Any]] = None
) -> AirbyteMessage:
if data is None:
data = stream_state
if source_stats is None:
source_stats = {"recordCount": 0.0}
stream_state_blob = AirbyteStateBlob.parse_obj(stream_state) if stream_state else None
return AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM, stream=AirbyteStreamState(stream_descriptor=descriptor, stream_state=stream_state_blob), data=data
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(stream_descriptor=descriptor, stream_state=stream_state_blob),
sourceStats=AirbyteStateStats(**source_stats),
data=data
),
)
@@ -178,73 +184,76 @@ async def test_incremental_two_sequential_reads(
@pytest.mark.parametrize(
"first_records, subsequent_records, expected_error",
"first_records, subsequent_records, inputs, expected_error",
[
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[],
],
IncrementalConfig(),
does_not_raise(),
id="test_incremental_with_2_states",
id="test_incremental_with_amount_of_states_less_than_3_whith_latest_only_state_checked",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 1.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}},
],
[
# Read after 2022-05-08. The first state is expected to be skipped as empty. So, subsequent reads will start with the second state message.
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}},
],
# Read after 2022-05-10. This is the second (last) subsequent read.
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}},
]
],
pytest.raises(AssertionError, match="First Read should produce at least one record"),
id="test_incremental_no_record_on_first_read_raises_error",
IncrementalConfig(),
does_not_raise(),
id="test_incremental_with_4_states_with_state_variation_checked",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 0.0}},
],
[
[]
],
IncrementalConfig(),
does_not_raise(),
id="test_incremental_no_records_on_first_read_skips_stream",
),
pytest.param(
[
@@ -256,222 +265,87 @@ async def test_incremental_two_sequential_reads(
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[]
],
pytest.raises(AssertionError, match="First Read should produce at least one state"),
id="test_incremental_no_state_on_first_read_raises_error",
IncrementalConfig(),
does_not_raise(),
id="test_incremental_no_states_on_first_read_skips_stream",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}, "sourceStats": {"recordCount": 2.0}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}, "sourceStats": {"recordCount": 2.0}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}, "sourceStats": {"recordCount": 2.0}},
],
],
IncrementalConfig(),
does_not_raise(),
id="test_first_incremental_only_younger_records",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-04"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-05"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-05"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-04"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-05"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-05"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}},
]
],
IncrementalConfig(),
pytest.raises(AssertionError, match="Records for subsequent reads with new state should be different"),
id="test_incremental_returns_identical",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-12"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-13"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}},
},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-13"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-14"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-15"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}},
},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-12"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-13"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}},
},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-13"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-14"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-15"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}},
},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-12"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-13"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}},
},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-13"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-14"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-15"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}},
},
],
[
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-13"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}},
},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-13"}},
{"type": Type.RECORD, "name": "test_stream_2", "data": {"date": "2022-05-14"}},
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-15"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}},
},
],
[
{
"type": Type.STATE,
"name": "test_stream_2",
"stream_state": {"date": "2022-05-15"},
"data": {"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}},
},
],
],
does_not_raise(),
id="test_incremental_with_multiple_streams",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": None},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
],
does_not_raise(),
id="test_incremental_with_none_state",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
],
[
[],
],
IncrementalConfig(),
does_not_raise(),
id="test_incremental_with_empty_second_read",
),
],
)
@pytest.mark.parametrize(
"run_per_stream_test",
[
pytest.param(False, id="test_read_with_multiple_states_using_a_mock_connector_emitting_legacy_state"),
pytest.param(True, id="test_read_with_multiple_states_using_a_mock_connector_emitting_per_stream_state"),
],
)
async def test_per_stream_read_with_multiple_states(mocker, first_records, subsequent_records, expected_error, run_per_stream_test):
async def test_per_stream_read_with_multiple_states(mocker, first_records, subsequent_records, inputs, expected_error):
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
@@ -483,56 +357,29 @@ async def test_per_stream_read_with_multiple_states(mocker, first_records, subse
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.overwrite,
cursor_field=["date"],
),
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test_stream_2",
json_schema={"type": "object", "properties": {"date": {"type": "date"}}},
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.overwrite,
cursor_field=["date"],
),
)
]
)
if run_per_stream_test:
call_read_output_messages = [
call_read_output_messages = [
build_per_stream_state_message(
descriptor=StreamDescriptor(name=record["name"]), stream_state=record["stream_state"], data=record.get("data", None), source_stats=record.get("sourceStats")
)
if record["type"] == Type.STATE
else build_record_message(record["name"], record["data"])
for record in list(first_records)
]
call_read_with_state_output_messages = [
[
build_per_stream_state_message(
descriptor=StreamDescriptor(name=record["name"]), stream_state=record["stream_state"], data=record.get("data", None)
descriptor=StreamDescriptor(name=record["name"]), stream_state=record["stream_state"], data=record.get("data", None), source_stats=record.get("sourceStats")
)
if record["type"] == Type.STATE
else build_record_message(record["name"], record["data"])
for record in list(first_records)
]
call_read_with_state_output_messages = [
[
build_per_stream_state_message(
descriptor=StreamDescriptor(name=record["name"]), stream_state=record["stream_state"], data=record.get("data", None)
)
if record["type"] == Type.STATE
else build_record_message(stream=record["name"], data=record["data"])
for record in state_records_group
]
for state_records_group in list(subsequent_records)
]
else:
call_read_output_messages = [
build_state_message(state=record.get("data") or {record["name"]: record["stream_state"]})
if record["type"] == Type.STATE
else build_record_message(stream=record["name"], data=record["data"])
for record in list(first_records)
]
call_read_with_state_output_messages = [
[
build_state_message(state=record.get("data") or {record["name"]: record["stream_state"]})
if record["type"] == Type.STATE
else build_record_message(stream=record["name"], data=record["data"])
for record in state_records_group
]
for state_records_group in list(subsequent_records)
for record in state_records_group
]
for state_records_group in list(subsequent_records)
]
docker_runner_mock = MagicMock()
docker_runner_mock.call_read = mocker.AsyncMock(return_value=call_read_output_messages)
@@ -545,10 +392,78 @@ async def test_per_stream_read_with_multiple_states(mocker, first_records, subse
connector_config=MagicMock(),
configured_catalog_for_incremental=catalog,
docker_runner=docker_runner_mock,
inputs=IncrementalConfig(),
inputs=inputs,
)
@pytest.mark.parametrize(
"non_unique_states, expected_unique_states, expected_record_count_per_state",
[
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}}
],
[],
[],
id="combine_three_duplicates_into_a_single_state_message"
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 0.0}}
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}}
],
[0.0],
id="multiple_equal_states_with_different_sourceStats_considered_to_be_equal"
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 0.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 7.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}}
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 2.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 7.0}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}}
],
[10.0, 3.0, 0.0]
)
],
)
async def test_get_unique_state_messages(non_unique_states, expected_unique_states, expected_record_count_per_state):
non_unique_states = [
build_per_stream_state_message(
descriptor=StreamDescriptor(name=state["name"]), stream_state=state["stream_state"], data=state.get("data", None), source_stats=state.get("sourceStats")
)
for state in non_unique_states
]
expected_unique_states = [
build_per_stream_state_message(
descriptor=StreamDescriptor(name=state["name"]), stream_state=state["stream_state"], data=state.get("data", None), source_stats=state.get("sourceStats")
)
for state in expected_unique_states
]
actual_unique_states = _TestIncremental()._get_unique_state_messages_with_record_count(non_unique_states)
assert len(actual_unique_states) == len(expected_unique_states)
if len(expected_unique_states):
for actual_state_data, expected_state, expected_record_count in zip(actual_unique_states, expected_unique_states, expected_record_count_per_state):
actual_state, actual_record_count = actual_state_data
assert actual_state == expected_state
assert actual_record_count == expected_record_count
async def test_config_skip_test(mocker):
docker_runner_mock = MagicMock()
docker_runner_mock.call_read = mocker.AsyncMock(return_value=[])
@@ -578,59 +493,6 @@ async def test_config_skip_test(mocker):
docker_runner_mock.call_read.assert_not_called()
async def test_state_skip_test(mocker):
docker_runner_mock = MagicMock()
first_records = [
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
]
call_read_output_messages = [
build_per_stream_state_message(
descriptor=StreamDescriptor(name=record["name"]), stream_state=record["stream_state"], data=record.get("data", None)
)
if record["type"] == Type.STATE
else build_record_message(record["name"], record["data"])
for record in list(first_records)
]
# There needs to be at least 3 state messages for the test to run
docker_runner_mock.call_read = mocker.AsyncMock(return_value=call_read_output_messages)
t = _TestIncremental()
with patch.object(pytest, "skip", return_value=None):
await t.test_read_sequential_slices(
inputs=IncrementalConfig(),
connector_config=MagicMock(),
configured_catalog_for_incremental=ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"date": {"type": "date"}}},
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.overwrite,
cursor_field=["date"],
)
]
),
docker_runner=docker_runner_mock,
)
# This is guaranteed to fail when the test gets executed
docker_runner_mock.call_read.assert_called()
docker_runner_mock.call_read_with_state.assert_not_called()
@pytest.mark.parametrize(
"read_output, expectation",
[