1
0
mirror of synced 2025-12-19 18:14:56 -05:00
Files
devin-ai-integration[bot] b3580344b7 🎉 New Source: box-data-extract (copy of #53682) (#56432)
Co-authored-by: Barduino <barduinor@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
2025-03-27 11:17:14 -03:00

265 lines
11 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
import requests
from box_sdk_gen import BoxAPIError, BoxClient, File, Folder, Items
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from .box_api import (
box_folder_ai_ask,
box_folder_ai_extract,
box_folder_ai_extract_structured,
box_folder_text_representation,
get_box_ccg_client,
)
from .schemas import get_generic_json_schema
logger = logging.getLogger("airbyte")
# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteMessage: An AirbyteMessage. Could be of any type
StreamData = Union[Mapping[str, Any], AirbyteMessage]
"""
TODO: Most comments in this class are instructive and should be deleted after the source is implemented.
This file provides a stubbed example of how to use the Airbyte CDK to develop both a source connector which supports full refresh or and an
incremental syncs from an HTTP API.
The various TODOs are both implementation hints and steps - fulfilling all the TODOs should be sufficient to implement one basic and one incremental
stream from a source. This pattern is the same one used by Airbyte internally to implement connectors.
The approach here is not authoritative, and devs are free to use their own judgement.
There are additional required TODOs in the files within the integration_tests folder and the spec.yaml file.
"""
# Source
class SourceBoxDataExtract(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
logger.info("Checking Box API connection...")
try:
box_client = get_box_ccg_client(config)
user = box_client.users.get_user_me()
logger.debug(f"box_subject_type: {config.get('box_subject_type')}, box_subject_id: {config.get('box_subject_id')}")
logger.info(f"Logged into Box as: {user.name} ({user.id} - {user.login})")
except BoxAPIError as e:
logger.error(f"Unable to connect to Box API with the provided credentials - {e}")
return False, f"Unable to connect to Box API with the provided credentials"
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
box_client = get_box_ccg_client(config)
box_folder_text_representation_stream = StreamTextRepresentationFolder(
box_client, config["box_folder_id"], is_recursive=config.get("is_recursive", False)
)
box_folder_ask_ai_stream = StreamAIAskFolder(
box_client, config["box_folder_id"], config["ask_ai_prompt"], is_recursive=config.get("is_recursive", False)
)
box_folder_extract_ai_stream = StreamAIExtractFolder(
box_client, config["box_folder_id"], config["extract_ai_prompt"], is_recursive=config.get("is_recursive", False)
)
box_folder_extract_structured_ai_stream = StreamAIExtractStructuredFolder(
client=box_client,
folder_id=config["box_folder_id"],
fields_json_str=config["extract_structured_ai_fields"],
is_recursive=config.get("is_recursive", False),
)
return [
box_folder_text_representation_stream,
box_folder_ask_ai_stream,
box_folder_extract_ai_stream,
box_folder_extract_structured_ai_stream,
]
# Streams
class StreamTextRepresentationFolder(Stream):
"""
Represents a Box Data Text Representation Stream from a Box Folder.
params:
client: BoxClient - Box Client object
folder_id: str - Box Folder ID
is_recursive: bool - Whether to read the folder recursively
"""
client: BoxClient = None
folder_id: str = None
is_recursive: bool = False
def __init__(self, client: BoxClient, folder_id: str, is_recursive: bool = False):
self.client = client
self.folder_id = folder_id
self.is_recursive = is_recursive
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""
return "id"
def get_json_schema(self):
return get_generic_json_schema()
def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
logger.info(f"Extracting text representation for files in folder {self.folder_id} {'recursively' if self.is_recursive else ''}")
items = box_folder_text_representation(self.client, self.folder_id, is_recursive=self.is_recursive)
for item in items:
airbyte_item: StreamData = item.file.to_dict()
airbyte_item["text_representation"] = item.text_representation
logger.info(f"Reading file {item.file.id} - {item.file.name}")
yield airbyte_item
class StreamAIAskFolder(Stream):
client: BoxClient = None
folder_id: str = None
is_recursive: bool = False
prompt: str = None
def __init__(self, client: BoxClient, folder_id: str, prompt: str, is_recursive: bool = False):
self.client = client
self.folder_id = folder_id
self.is_recursive = is_recursive
self.prompt = prompt
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""
return "id"
def get_json_schema(self):
return get_generic_json_schema()
def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
logger.info(f"Asking AI {self.prompt} for all files in folder {self.folder_id} {'recursively' if self.is_recursive else ''}")
items = box_folder_ai_ask(self.client, self.folder_id, prompt=self.prompt, is_recursive=self.is_recursive)
for item in items:
airbyte_item: StreamData = item.file.to_dict()
airbyte_item["text_representation"] = item.text_representation
logger.info(f"Reading file {item.file.id} - {item.file.name}")
yield airbyte_item
class StreamAIExtractFolder(Stream):
client: BoxClient = None
folder_id: str = None
is_recursive: bool = False
prompt: str = None
def __init__(self, client: BoxClient, folder_id: str, prompt: str, is_recursive: bool = False):
self.client = client
self.folder_id = folder_id
self.is_recursive = is_recursive
self.prompt = prompt
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""
return "id"
def get_json_schema(self):
return get_generic_json_schema()
def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
logger.info(f"Extracting AI {self.prompt} for all files in folder {self.folder_id} {'recursively' if self.is_recursive else ''}")
items = box_folder_ai_extract(self.client, self.folder_id, prompt=self.prompt, is_recursive=self.is_recursive)
for item in items:
airbyte_item: StreamData = item.file.to_dict()
airbyte_item["text_representation"] = item.text_representation
logger.info(f"Reading file {item.file.id} - {item.file.name}")
yield airbyte_item
class StreamAIExtractStructuredFolder(Stream):
client: BoxClient = None
folder_id: str = None
is_recursive: bool = False
fields_json_str: str = None
def __init__(self, client: BoxClient, folder_id: str, fields_json_str: str, is_recursive: bool = False):
self.client = client
self.folder_id = folder_id
self.is_recursive = is_recursive
self.fields_json_str = fields_json_str
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""
return "id"
def get_json_schema(self):
return get_generic_json_schema()
def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
logger.info(
f"Extracting Struvctured AI {self.fields_json_str} for all files in folder {self.folder_id} {'recursively' if self.is_recursive else ''}"
)
items = box_folder_ai_extract_structured(
self.client, self.folder_id, fields_json_str=self.fields_json_str, is_recursive=self.is_recursive
)
for item in items:
airbyte_item: StreamData = item.file.to_dict()
airbyte_item["text_representation"] = item.text_representation
logger.info(f"Reading file {item.file.id} - {item.file.name}")
yield airbyte_item