[airbyte-cdk] add incomplete status to availability check during read (#41034)
This commit is contained in:
@@ -131,6 +131,7 @@ class AbstractSource(Source, ABC):
|
||||
stream_is_available, reason = stream_instance.check_availability(logger, self)
|
||||
if not stream_is_available:
|
||||
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. {reason}")
|
||||
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
|
||||
continue
|
||||
logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
|
||||
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.STARTED)
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
import concurrent
|
||||
import logging
|
||||
from queue import Queue
|
||||
from typing import Iterable, Iterator, List
|
||||
from typing import Iterable, Iterator, List, Optional, Tuple
|
||||
|
||||
from airbyte_cdk.models import AirbyteMessage
|
||||
from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus
|
||||
from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor
|
||||
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel
|
||||
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
|
||||
@@ -19,6 +19,7 @@ from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partitio
|
||||
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
|
||||
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel, QueueItem
|
||||
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
|
||||
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
|
||||
|
||||
|
||||
class ConcurrentSource:
|
||||
@@ -80,7 +81,11 @@ class ConcurrentSource:
|
||||
streams: List[AbstractStream],
|
||||
) -> Iterator[AirbyteMessage]:
|
||||
self._logger.info("Starting syncing")
|
||||
stream_instances_to_read_from = self._get_streams_to_read_from(streams)
|
||||
stream_instances_to_read_from, not_available_streams = self._get_streams_to_read_from(streams)
|
||||
|
||||
for stream, message in not_available_streams:
|
||||
self._logger.warning(f"Skipped syncing stream '{stream.name}' because it was unavailable. {message}")
|
||||
yield stream_status_as_airbyte_message(stream, AirbyteStreamStatus.INCOMPLETE)
|
||||
|
||||
# Return early if there are no streams to read from
|
||||
if not stream_instances_to_read_from:
|
||||
@@ -151,7 +156,9 @@ class ConcurrentSource:
|
||||
else:
|
||||
raise ValueError(f"Unknown queue item type: {type(queue_item)}")
|
||||
|
||||
def _get_streams_to_read_from(self, streams: List[AbstractStream]) -> List[AbstractStream]:
|
||||
def _get_streams_to_read_from(
|
||||
self, streams: List[AbstractStream]
|
||||
) -> Tuple[List[AbstractStream], List[Tuple[AbstractStream, Optional[str]]]]:
|
||||
"""
|
||||
Iterate over the configured streams and return a list of streams to read from.
|
||||
If a stream is not configured, it will be skipped.
|
||||
@@ -159,10 +166,11 @@ class ConcurrentSource:
|
||||
If a stream is not available, it will be skipped
|
||||
"""
|
||||
stream_instances_to_read_from = []
|
||||
not_available_streams = []
|
||||
for stream in streams:
|
||||
stream_availability = stream.check_availability()
|
||||
if not stream_availability.is_available():
|
||||
self._logger.warning(f"Skipped syncing stream '{stream.name}' because it was unavailable. {stream_availability.message()}")
|
||||
not_available_streams.append((stream, stream_availability.message()))
|
||||
continue
|
||||
stream_instances_to_read_from.append(stream)
|
||||
return stream_instances_to_read_from
|
||||
return stream_instances_to_read_from, not_available_streams
|
||||
|
||||
@@ -57,7 +57,7 @@ class _MockSource(ConcurrentSourceAdapter):
|
||||
|
||||
|
||||
@freezegun.freeze_time("2020-01-01T00:00:00")
|
||||
def test_concurrent_source_adapter():
|
||||
def test_concurrent_source_adapter(as_stream_status, remove_stack_trace):
|
||||
concurrent_source = Mock()
|
||||
message_from_concurrent_stream = AirbyteMessage(
|
||||
type=MessageType.RECORD,
|
||||
@@ -76,6 +76,7 @@ def test_concurrent_source_adapter():
|
||||
adapter = _MockSource(concurrent_source, {regular_stream: False, concurrent_stream: True, unavailable_stream: False}, logger)
|
||||
|
||||
messages = list(adapter.read(logger, {}, _configured_catalog([regular_stream, concurrent_stream, unavailable_stream])))
|
||||
|
||||
records = [m for m in messages if m.type == MessageType.RECORD]
|
||||
|
||||
expected_records = [
|
||||
@@ -92,6 +93,12 @@ def test_concurrent_source_adapter():
|
||||
|
||||
assert records == expected_records
|
||||
|
||||
unavailable_stream_trace_messages = [m for m in messages if m.type == MessageType.TRACE and m.trace.stream_status.status == AirbyteStreamStatus.INCOMPLETE]
|
||||
expected_status = [as_stream_status("s3", AirbyteStreamStatus.INCOMPLETE)]
|
||||
|
||||
assert len(unavailable_stream_trace_messages) == 1
|
||||
assert unavailable_stream_trace_messages[0].trace.stream_status == expected_status[0].trace.stream_status
|
||||
|
||||
|
||||
def _mock_stream(name: str, data=[], available: bool = True):
|
||||
s = Mock()
|
||||
|
||||
@@ -17,6 +17,7 @@ from airbyte_cdk.models import (
|
||||
AirbyteStateMessage,
|
||||
AirbyteStateType,
|
||||
AirbyteStreamState,
|
||||
AirbyteStreamStatus,
|
||||
ConfiguredAirbyteCatalog,
|
||||
StreamDescriptor,
|
||||
SyncMode,
|
||||
@@ -567,7 +568,7 @@ def test_read_default_http_availability_strategy_stream_available(catalog, mocke
|
||||
assert non_http_stream.read_records.called
|
||||
|
||||
|
||||
def test_read_default_http_availability_strategy_stream_unavailable(catalog, mocker, caplog):
|
||||
def test_read_default_http_availability_strategy_stream_unavailable(catalog, mocker, caplog, remove_stack_trace, as_stream_status):
|
||||
mocker.patch.multiple(Stream, __abstractmethods__=set())
|
||||
|
||||
class MockHttpStream(HttpStream):
|
||||
@@ -619,8 +620,8 @@ def test_read_default_http_availability_strategy_stream_unavailable(catalog, moc
|
||||
with caplog.at_level(logging.WARNING):
|
||||
records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})]
|
||||
|
||||
# 0 for http stream, 3 for non http stream, 1 for non http stream state message and 3 status trace messages
|
||||
assert len(records) == 0 + 3 + 1 + 3
|
||||
# 0 for http stream, 3 for non http stream, 1 for non http stream state message and 4 status trace messages
|
||||
assert len(records) == 0 + 3 + 1 + 4
|
||||
assert non_http_stream.read_records.called
|
||||
expected_logs = [
|
||||
f"Skipped syncing stream '{http_stream.name}' because it was unavailable.",
|
||||
@@ -630,8 +631,12 @@ def test_read_default_http_availability_strategy_stream_unavailable(catalog, moc
|
||||
for message in expected_logs:
|
||||
assert message in caplog.text
|
||||
|
||||
expected_status = [as_stream_status(f"{http_stream.name}", AirbyteStreamStatus.INCOMPLETE)]
|
||||
records = [remove_stack_trace(record) for record in records]
|
||||
assert records[0].trace.stream_status == expected_status[0].trace.stream_status
|
||||
|
||||
def test_read_default_http_availability_strategy_parent_stream_unavailable(catalog, mocker, caplog):
|
||||
|
||||
def test_read_default_http_availability_strategy_parent_stream_unavailable(catalog, mocker, caplog, remove_stack_trace, as_stream_status):
|
||||
"""Test default availability strategy if error happens during slice extraction (reading of parent stream)"""
|
||||
mocker.patch.multiple(Stream, __abstractmethods__=set())
|
||||
|
||||
@@ -702,8 +707,9 @@ def test_read_default_http_availability_strategy_parent_stream_unavailable(catal
|
||||
with caplog.at_level(logging.WARNING):
|
||||
records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})]
|
||||
|
||||
# 0 for http stream, 3 for non http stream and 3 status trace messages
|
||||
assert len(records) == 0
|
||||
# 0 for http stream, 0 for non http stream and 1 status trace messages
|
||||
assert len(records) == 1
|
||||
|
||||
expected_logs = [
|
||||
f"Skipped syncing stream '{http_stream.name}' because it was unavailable.",
|
||||
"Forbidden.",
|
||||
@@ -711,3 +717,7 @@ def test_read_default_http_availability_strategy_parent_stream_unavailable(catal
|
||||
]
|
||||
for message in expected_logs:
|
||||
assert message in caplog.text
|
||||
|
||||
expected_status = [as_stream_status(f"{http_stream.name}", AirbyteStreamStatus.INCOMPLETE)]
|
||||
records = [remove_stack_trace(record) for record in records]
|
||||
assert records[0].trace.stream_status == expected_status[0].trace.stream_status
|
||||
|
||||
Reference in New Issue
Block a user