1
0
mirror of synced 2026-02-02 07:01:59 -05:00
Files
airbyte/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_reader.py
Alexandre Girard 25fc396cdf CDK: ThreadBasedConcurrentStream skeleton and top-level AbstractStream (#30111)
Co-authored-by: girarda <girarda@users.noreply.github.com>
Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com>
Co-authored-by: Catherine Noll <clnoll@users.noreply.github.com>
2023-10-11 16:46:02 -07:00

34 lines
1.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from queue import Queue
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel, QueueItem
class PartitionReader:
"""
Generates records from a partition and puts them in a queuea.
"""
def __init__(self, queue: Queue[QueueItem]) -> None:
"""
:param queue: The queue to put the records in.
"""
self._queue = queue
def process_partition(self, partition: Partition) -> None:
"""
Process a partition and put the records in the output queue.
When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
This method is meant to be called from a thread.
:param partition: The partition to read data from
:return: None
"""
for record in partition.read():
self._queue.put(record)
self._queue.put(PartitionCompleteSentinel(partition))