diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 8cfcb0b5bea..c6fc000b149 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -13,6 +13,7 @@ from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, AirbyteStateMessage, + AirbyteStreamStatus, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Level, @@ -28,6 +29,7 @@ from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config from airbyte_cdk.utils.event_timing import create_timer +from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -113,6 +115,8 @@ class AbstractSource(Source, ABC): continue try: timer.start_event(f"Syncing stream {configured_stream.stream.name}") + logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") + yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED) yield from self._read_stream( logger=logger, stream_instance=stream_instance, @@ -120,10 +124,15 @@ class AbstractSource(Source, ABC): state_manager=state_manager, internal_config=internal_config, ) + logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") + yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE) except AirbyteTracedException as e: + yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE) raise e except Exception as e: logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}") + logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") + yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE) display_message = stream_instance.get_error_display_message(e) if display_message: raise AirbyteTracedException.from_exception(e, message=display_message) from e @@ -185,6 +194,10 @@ class AbstractSource(Source, ABC): for record in record_iterator: if record.type == MessageType.RECORD: record_counter += 1 + if record_counter == 1: + logger.info(f"Marking stream {stream_name} as RUNNING") + # If we just read the first record of the stream, emit the transition to the RUNNING state + yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING) yield record logger.info(f"Read {record_counter} records from {stream_name} stream") diff --git a/airbyte-cdk/python/airbyte_cdk/utils/stream_status_utils.py b/airbyte-cdk/python/airbyte_cdk/utils/stream_status_utils.py new file mode 100644 index 00000000000..cd9e4527688 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/utils/stream_status_utils.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime + +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, + ConfiguredAirbyteStream, + StreamDescriptor, + TraceType, +) +from airbyte_cdk.models import Type as MessageType + + +def as_airbyte_message(stream: ConfiguredAirbyteStream, current_status: AirbyteStreamStatus) -> AirbyteMessage: + """ + Builds an AirbyteStreamStatusTraceMessage for the provided stream + """ + + now_millis = datetime.now().timestamp() * 1000.0 + + trace_message = AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=now_millis, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name=stream.stream.name, namespace=stream.stream.namespace), + status=current_status, + ), + ) + + return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 8731a2d3ad7..1c9e2d97f23 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -46,7 +46,7 @@ setup( packages=find_packages(exclude=("unit_tests",)), package_data={"airbyte_cdk": ["py.typed", "sources/declarative/declarative_component_schema.yaml"]}, install_requires=[ - "airbyte-protocol-models==1.0.0", + "airbyte-protocol-models==0.3.6", "backoff", "dpath~=2.0.1", "isodate~=0.6.1", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py index 3358f7baf03..9c28997e917 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -1242,7 +1242,7 @@ def _create_page(response_body): def test_read_manifest_declarative_source(test_name, manifest, pages, expected_records, expected_calls): _stream_name = "Rates" with patch.object(HttpStream, "_fetch_next_page", side_effect=pages) as mock_http_stream: - output_data = [message.record.data for message in _run_read(manifest, _stream_name)] + output_data = [message.record.data for message in _run_read(manifest, _stream_name) if message.record] assert expected_records == output_data mock_http_stream.assert_has_calls(expected_calls) diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index e9af077f02e..aaa347ed351 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -21,6 +21,9 @@ from airbyte_cdk.models import ( AirbyteStateType, AirbyteStream, AirbyteStreamState, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, @@ -28,8 +31,10 @@ from airbyte_cdk.models import ( Status, StreamDescriptor, SyncMode, - Type, + TraceType, ) +from airbyte_cdk.models import Type +from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import IncrementalMixin, Stream @@ -250,6 +255,19 @@ def _as_records(stream: str, data: List[Dict[str, Any]]) -> List[AirbyteMessage] return [_as_record(stream, datum) for datum in data] +def _as_stream_status(stream: str, status: AirbyteStreamStatus) -> AirbyteMessage: + trace_message = AirbyteTraceMessage( + emitted_at=datetime.datetime.now().timestamp() * 1000.0, + type=TraceType.STREAM_STATUS, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name=stream), + status=status, + ), + ) + + return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) + + def _as_state(state_data: Dict[str, Any], stream_name: str = "", per_stream_state: Dict[str, Any] = None): if per_stream_state: return AirbyteMessage( @@ -277,6 +295,8 @@ def _fix_emitted_at(messages: List[AirbyteMessage]) -> List[AirbyteMessage]: for msg in messages: if msg.type == Type.RECORD and msg.record: msg.record.emitted_at = GLOBAL_EMITTED_AT + if msg.type == Type.TRACE and msg.trace: + msg.trace.emitted_at = GLOBAL_EMITTED_AT return messages @@ -296,7 +316,17 @@ def test_valid_full_refresh_read_no_slices(mocker): ] ) - expected = _as_records("s1", stream_output) + _as_records("s2", stream_output) + expected = _fix_emitted_at( + [ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), + *_as_records("s1", stream_output), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), + *_as_records("s2", stream_output), + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE) + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog))) assert expected == messages @@ -326,7 +356,17 @@ def test_valid_full_refresh_read_with_slices(mocker): ] ) - expected = [*_as_records("s1", slices), *_as_records("s2", slices)] + expected = _fix_emitted_at( + [ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), + *_as_records("s1", slices), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), + *_as_records("s2", slices), + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE) + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog))) @@ -448,18 +488,24 @@ class TestIncrementalRead: ] ) - expected = [ + expected = _fix_emitted_at([ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), _as_record("s1", stream_output[0]), _as_record("s1", stream_output[1]), _as_state({"s1": new_state_from_connector}, "s1", new_state_from_connector) if per_stream_enabled else _as_state({"s1": new_state_from_connector}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), _as_record("s2", stream_output[0]), _as_record("s2", stream_output[1]), _as_state({"s1": new_state_from_connector, "s2": new_state_from_connector}, "s2", new_state_from_connector) if per_stream_enabled else _as_state({"s1": new_state_from_connector, "s2": new_state_from_connector}), - ] + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=input_state))) assert messages == expected @@ -521,18 +567,24 @@ class TestIncrementalRead: ] ) - expected = [ + expected = _fix_emitted_at([ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), _as_record("s1", stream_output[0]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), _as_record("s1", stream_output[1]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), _as_record("s2", stream_output[0]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), _as_record("s2", stream_output[1]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), - ] + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=input_state))) assert expected == messages @@ -582,12 +634,18 @@ class TestIncrementalRead: ] ) - expected = [ + expected = _fix_emitted_at([ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), *_as_records("s1", stream_output), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), *_as_records("s2", stream_output), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), - ] + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=input_state))) @@ -658,20 +716,26 @@ class TestIncrementalRead: ] ) - expected = [ + expected = _fix_emitted_at([ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), # stream 1 slice 1 *_as_records("s1", stream_output), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), # stream 1 slice 2 *_as_records("s1", stream_output), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), # stream 2 slice 1 *_as_records("s2", stream_output), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), # stream 2 slice 2 *_as_records("s2", stream_output), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), - ] + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=input_state))) @@ -753,10 +817,14 @@ class TestIncrementalRead: ] ) - expected = [ + expected = _fix_emitted_at([ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), + _as_stream_status("s2", AirbyteStreamStatus.STARTED), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), - ] + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=input_state))) @@ -837,8 +905,10 @@ class TestIncrementalRead: ] ) - expected = [ + expected = _fix_emitted_at([ # stream 1 slice 1 + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), _as_record("s1", stream_output[0]), _as_record("s1", stream_output[1]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), @@ -850,7 +920,10 @@ class TestIncrementalRead: _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), _as_record("s1", stream_output[2]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), # stream 2 slice 1 + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), _as_record("s2", stream_output[0]), _as_record("s2", stream_output[1]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), @@ -862,7 +935,8 @@ class TestIncrementalRead: _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), _as_record("s2", stream_output[2]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), - ] + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), + ]) messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=input_state))) @@ -942,6 +1016,8 @@ class TestIncrementalRead: expected = _fix_emitted_at( [ + _as_stream_status("s1", AirbyteStreamStatus.STARTED), + _as_stream_status("s1", AirbyteStreamStatus.RUNNING), # stream 1 slice 1 stream_data_to_airbyte_message("s1", stream_output[0]), stream_data_to_airbyte_message("s1", stream_output[1]), @@ -956,7 +1032,10 @@ class TestIncrementalRead: _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), stream_data_to_airbyte_message("s1", stream_output[3]), _as_state({"s1": state}, "s1", state) if per_stream_enabled else _as_state({"s1": state}), + _as_stream_status("s1", AirbyteStreamStatus.COMPLETE), # stream 2 slice 1 + _as_stream_status("s2", AirbyteStreamStatus.STARTED), + _as_stream_status("s2", AirbyteStreamStatus.RUNNING), stream_data_to_airbyte_message("s2", stream_output[0]), stream_data_to_airbyte_message("s2", stream_output[1]), stream_data_to_airbyte_message("s2", stream_output[2]), @@ -970,6 +1049,7 @@ class TestIncrementalRead: _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), stream_data_to_airbyte_message("s2", stream_output[3]), _as_state({"s1": state, "s2": state}, "s2", state) if per_stream_enabled else _as_state({"s1": state, "s2": state}), + _as_stream_status("s2", AirbyteStreamStatus.COMPLETE), ] ) diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index f57956a8789..5237c81f87e 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -365,8 +365,8 @@ def test_internal_config(abstract_source, catalog): # Test with empty config logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}") records = [r for r in abstract_source.read(logger=logger, config={}, catalog=catalog, state={})] - # 3 for http stream and 3 for non http stream - assert len(records) == 3 + 3 + # 3 for http stream, 3 for non http stream and 3 for stream status messages for each stream (2x) + assert len(records) == 3 + 3 + 3 + 3 assert http_stream.read_records.called assert non_http_stream.read_records.called # Make sure page_size havent been set @@ -375,21 +375,21 @@ def test_internal_config(abstract_source, catalog): # Test with records limit set to 1 internal_config = {"some_config": 100, "_limit": 1} records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})] - # 1 from http stream + 1 from non http stream - assert len(records) == 1 + 1 + # 1 from http stream + 1 from non http stream and 3 for stream status messages for each stream (2x) + assert len(records) == 1 + 1 + 3 + 3 assert "_limit" not in abstract_source.streams_config assert "some_config" in abstract_source.streams_config # Test with records limit set to number that exceeds expceted records internal_config = {"some_config": 100, "_limit": 20} records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})] - assert len(records) == 3 + 3 + assert len(records) == 3 + 3 + 3 + 3 # Check if page_size paramter is set to http instance only internal_config = {"some_config": 100, "_page_size": 2} records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})] assert "_page_size" not in abstract_source.streams_config assert "some_config" in abstract_source.streams_config - assert len(records) == 3 + 3 + assert len(records) == 3 + 3 + 3 + 3 assert http_stream.page_size == 2 # Make sure page_size havent been set for non http streams assert not non_http_stream.page_size @@ -402,6 +402,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog): STREAM_LIMIT = 2 SLICE_DEBUG_LOG_COUNT = 1 FULL_RECORDS_NUMBER = 3 + TRACE_STATUS_COUNT = 3 streams = abstract_source.streams(None) http_stream = streams[0] http_stream.read_records.return_value = [{}] * FULL_RECORDS_NUMBER @@ -409,7 +410,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog): catalog.streams[0].sync_mode = SyncMode.full_refresh records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] - assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] # Check if log line matches number of limit read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] @@ -418,14 +419,16 @@ def test_internal_config_limit(mocker, abstract_source, catalog): # No limit, check if state record produced for incremental stream catalog.streams[0].sync_mode = SyncMode.incremental records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == FULL_RECORDS_NUMBER + SLICE_DEBUG_LOG_COUNT + 1 - assert records[-1].type == Type.STATE + assert len(records) == FULL_RECORDS_NUMBER + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT + 1 + assert records[-2].type == Type.STATE + assert records[-1].type == Type.TRACE # Set limit and check if state is produced when limit is set for incremental stream logger_mock.reset_mock() records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] - assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + 1 - assert records[-1].type == Type.STATE + assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT + 1 + assert records[-2].type == Type.STATE + assert records[-1].type == Type.TRACE logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] assert read_log_record[0].startswith(f"Read {STREAM_LIMIT} ") @@ -436,6 +439,7 @@ SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}} def test_source_config_no_transform(mocker, abstract_source, catalog): SLICE_DEBUG_LOG_COUNT = 1 + TRACE_STATUS_COUNT = 3 logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) @@ -443,7 +447,7 @@ def test_source_config_no_transform(mocker, abstract_source, catalog): http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [[{"value": 23}] * 5] * 2 records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT) + assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT) assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": 23}] * 2 * 5 assert http_stream.get_json_schema.call_count == 5 assert non_http_stream.get_json_schema.call_count == 5 @@ -453,6 +457,7 @@ def test_source_config_transform(mocker, abstract_source, catalog): logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG SLICE_DEBUG_LOG_COUNT = 2 + TRACE_STATUS_COUNT = 6 streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) @@ -460,7 +465,7 @@ def test_source_config_transform(mocker, abstract_source, catalog): http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}] * 2 @@ -468,13 +473,14 @@ def test_source_config_transform_and_no_transform(mocker, abstract_source, catal logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG SLICE_DEBUG_LOG_COUNT = 2 + TRACE_STATUS_COUNT = 6 streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + TRACE_STATUS_COUNT assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}, {"value": 23}] @@ -520,8 +526,8 @@ def test_read_default_http_availability_strategy_stream_available(catalog, mocke source = MockAbstractSource(streams=streams) logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}") records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] - # 3 for http stream and 3 for non http stream - assert len(records) == 3 + 3 + # 3 for http stream, 3 for non http stream and 3 for stream status messages for each stream (2x) + assert len(records) == 3 + 3 + 3 + 3 assert http_stream.read_records.called assert non_http_stream.read_records.called @@ -578,8 +584,8 @@ def test_read_default_http_availability_strategy_stream_unavailable(catalog, moc with caplog.at_level(logging.WARNING): records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] - # 0 for http stream and 3 for non http stream - assert len(records) == 0 + 3 + # 0 for http stream, 3 for non http stream and 3 status trace meessages + assert len(records) == 0 + 3 + 3 assert non_http_stream.read_records.called expected_logs = [ f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", diff --git a/airbyte-cdk/python/unit_tests/utils/test_stream_status_utils.py b/airbyte-cdk/python/unit_tests/utils/test_stream_status_utils.py new file mode 100644 index 00000000000..d89a400d8f4 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/utils/test_stream_status_utils.py @@ -0,0 +1,70 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteStream, + AirbyteStreamStatus, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, + TraceType, +) +from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message + +stream = AirbyteStream(name="name", namespace="namespace", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]) +configured_stream = ConfiguredAirbyteStream(stream=stream, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite) + + +def test_started_as_message(): + stream_status = AirbyteStreamStatus.STARTED + airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status) + + assert type(airbyte_message) == AirbyteMessage + assert airbyte_message.type == MessageType.TRACE + assert airbyte_message.trace.type == TraceType.STREAM_STATUS + assert airbyte_message.trace.emitted_at > 0 + assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name + assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace + assert airbyte_message.trace.stream_status.status == stream_status + + +def test_running_as_message(): + stream_status = AirbyteStreamStatus.RUNNING + airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status) + + assert type(airbyte_message) == AirbyteMessage + assert airbyte_message.type == MessageType.TRACE + assert airbyte_message.trace.type == TraceType.STREAM_STATUS + assert airbyte_message.trace.emitted_at > 0 + assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name + assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace + assert airbyte_message.trace.stream_status.status == stream_status + + +def test_complete_as_message(): + stream_status = AirbyteStreamStatus.COMPLETE + airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status) + + assert type(airbyte_message) == AirbyteMessage + assert airbyte_message.type == MessageType.TRACE + assert airbyte_message.trace.type == TraceType.STREAM_STATUS + assert airbyte_message.trace.emitted_at > 0 + assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name + assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace + assert airbyte_message.trace.stream_status.status == stream_status + + +def test_incomplete_failed_as_message(): + stream_status = AirbyteStreamStatus.INCOMPLETE + airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status) + + assert type(airbyte_message) == AirbyteMessage + assert airbyte_message.type == MessageType.TRACE + assert airbyte_message.trace.type == TraceType.STREAM_STATUS + assert airbyte_message.trace.emitted_at > 0 + assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name + assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace + assert airbyte_message.trace.stream_status.status == stream_status diff --git a/airbyte-config-oss/config-models-oss/src/test/java/io/airbyte/configoss/DataTypeEnumTest.java b/airbyte-config-oss/config-models-oss/src/test/java/io/airbyte/configoss/DataTypeEnumTest.java index d8010905044..9bd09dc411c 100644 --- a/airbyte-config-oss/config-models-oss/src/test/java/io/airbyte/configoss/DataTypeEnumTest.java +++ b/airbyte-config-oss/config-models-oss/src/test/java/io/airbyte/configoss/DataTypeEnumTest.java @@ -18,7 +18,7 @@ class DataTypeEnumTest { @Test void testConversionFromJsonSchemaPrimitiveToDataType() { assertEquals(5, DataType.class.getEnumConstants().length); - assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length); + assertEquals(17, JsonSchemaPrimitive.class.getEnumConstants().length); assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase())); assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase())); diff --git a/deps.toml b/deps.toml index 5b3f23e50d0..d5a01baddd7 100644 --- a/deps.toml +++ b/deps.toml @@ -1,5 +1,5 @@ [versions] -airbyte-protocol = "1.0.0" +airbyte-protocol = "0.3.5" commons_io = "2.7" connectors-destination-testcontainers-clickhouse = "1.17.3" connectors-destination-testcontainers-elasticsearch = "1.17.3"