committed by
GitHub
parent
3465f3b5a7
commit
71d50635cc
@@ -30,7 +30,6 @@ class PartitionEnqueuer:
|
||||
|
||||
This method is meant to be called in a separate thread.
|
||||
:param partition_generator: The partition Generator
|
||||
:param sync_mode: The sync mode used
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
|
||||
@@ -127,6 +127,7 @@ class ThreadBasedConcurrentStream(AbstractStream):
|
||||
if finished_partitions and all(partitions_to_done.values()):
|
||||
# All partitions were generated and process. We're done here
|
||||
break
|
||||
|
||||
self._check_for_errors(futures)
|
||||
|
||||
def _submit_task(self, futures: List[Future[Any]], function: Callable[..., Any], *args: Any) -> None:
|
||||
@@ -137,12 +138,32 @@ class ThreadBasedConcurrentStream(AbstractStream):
|
||||
def _wait_while_too_many_pending_futures(self, futures: List[Future[Any]]) -> None:
|
||||
# Wait until the number of pending tasks is < self._max_concurrent_tasks
|
||||
while True:
|
||||
pending_futures = [f for f in futures if not f.done()]
|
||||
if len(pending_futures) < self._max_concurrent_tasks:
|
||||
self._prune_futures(futures)
|
||||
if len(futures) < self._max_concurrent_tasks:
|
||||
break
|
||||
self._logger.info("Main thread is sleeping because the task queue is full...")
|
||||
time.sleep(self._sleep_time)
|
||||
|
||||
def _prune_futures(self, futures: List[Future[Any]]) -> None:
|
||||
"""
|
||||
Take a list in input and remove the futures that are completed. If a future has an exception, it'll raise and kill the stream
|
||||
operation.
|
||||
|
||||
Pruning this list safely relies on the assumptions that only the main thread can modify the list of futures.
|
||||
"""
|
||||
if len(futures) < self._max_concurrent_tasks:
|
||||
return
|
||||
|
||||
for index in range(len(futures)):
|
||||
future = futures[index]
|
||||
optional_exception = future.exception()
|
||||
if optional_exception:
|
||||
exception = RuntimeError(f"Failed reading from stream {self.name} with error: {optional_exception}")
|
||||
self._stop_and_raise_exception(exception)
|
||||
|
||||
if future.done():
|
||||
futures.pop(index)
|
||||
|
||||
def _check_for_errors(self, futures: List[Future[Any]]) -> None:
|
||||
exceptions_from_futures = [f for f in [future.exception() for future in futures] if f is not None]
|
||||
if exceptions_from_futures:
|
||||
|
||||
Reference in New Issue
Block a user