diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py index 55b97474b38..9358e1d09c0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py @@ -30,7 +30,6 @@ class PartitionEnqueuer: This method is meant to be called in a separate thread. :param partition_generator: The partition Generator - :param sync_mode: The sync mode used :return: """ try: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py index 54691b6ee14..44aa69de491 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py @@ -127,6 +127,7 @@ class ThreadBasedConcurrentStream(AbstractStream): if finished_partitions and all(partitions_to_done.values()): # All partitions were generated and process. We're done here break + self._check_for_errors(futures) def _submit_task(self, futures: List[Future[Any]], function: Callable[..., Any], *args: Any) -> None: @@ -137,12 +138,32 @@ class ThreadBasedConcurrentStream(AbstractStream): def _wait_while_too_many_pending_futures(self, futures: List[Future[Any]]) -> None: # Wait until the number of pending tasks is < self._max_concurrent_tasks while True: - pending_futures = [f for f in futures if not f.done()] - if len(pending_futures) < self._max_concurrent_tasks: + self._prune_futures(futures) + if len(futures) < self._max_concurrent_tasks: break self._logger.info("Main thread is sleeping because the task queue is full...") time.sleep(self._sleep_time) + def _prune_futures(self, futures: List[Future[Any]]) -> None: + """ + Take a list in input and remove the futures that are completed. If a future has an exception, it'll raise and kill the stream + operation. + + Pruning this list safely relies on the assumptions that only the main thread can modify the list of futures. + """ + if len(futures) < self._max_concurrent_tasks: + return + + for index in range(len(futures)): + future = futures[index] + optional_exception = future.exception() + if optional_exception: + exception = RuntimeError(f"Failed reading from stream {self.name} with error: {optional_exception}") + self._stop_and_raise_exception(exception) + + if future.done(): + futures.pop(index) + def _check_for_errors(self, futures: List[Future[Any]]) -> None: exceptions_from_futures = [f for f in [future.exception() for future in futures] if f is not None] if exceptions_from_futures: diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py index a0e9e423521..152560ee629 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py @@ -5,6 +5,7 @@ import unittest from unittest.mock import Mock, call +import pytest from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.streams.concurrent.availability_strategy import STREAM_AVAILABLE from airbyte_cdk.sources.streams.concurrent.cursor import Cursor @@ -127,13 +128,29 @@ class ThreadBasedConcurrentStreamTest(unittest.TestCase): # Verify that the done() method will be called until only one future is still running f1.done.side_effect = [False, False] + f1.exception.return_value = None f2.done.side_effect = [False, True] + f2.exception.return_value = None futures = [f1, f2] self._stream._wait_while_too_many_pending_futures(futures) f1.done.assert_has_calls([call(), call()]) f2.done.assert_has_calls([call(), call()]) + def test_given_exception_then_fail_immediately(self): + f1 = Mock() + f2 = Mock() + + # Verify that the done() method will be called until only one future is still running + f1.done.return_value = False + f1.exception.return_value = None + f2.done.return_value = False + f2.exception.return_value = ValueError("An exception") + futures = [f1, f2] + + with pytest.raises(RuntimeError): + self._stream._wait_while_too_many_pending_futures(futures) + def test_as_airbyte_stream(self): expected_airbyte_stream = AirbyteStream( name=self._name,