1
0
mirror of synced 2025-12-20 02:23:30 -05:00
Files
2025-10-27 12:40:46 +02:00

592 lines
25 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import json
import logging
import os
import sys
import tempfile
import traceback
import urllib
import zipfile
from os import environ
from typing import Iterable
from urllib.parse import urlparse
from zipfile import BadZipFile
import backoff
import boto3
import botocore
import google
import numpy as np
import pandas as pd
import smart_open
import smart_open.ssh
from azure.storage.blob import BlobServiceClient
from genson import SchemaBuilder
from google.cloud.storage import Client as GCSClient
from google.oauth2 import service_account
from openpyxl import load_workbook
from openpyxl.utils.exceptions import InvalidFileException
from pandas.errors import ParserError
from paramiko import SSHException
from urllib3.exceptions import ProtocolError
from yaml import safe_load
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import AirbyteStream, FailureType, SyncMode
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
from .utils import LOCAL_STORAGE_NAME, backoff_handler
SSH_TIMEOUT = 60
# Force the log level of the smart-open logger to ERROR - https://github.com/airbytehq/airbyte/pull/27157
logging.getLogger("smart_open").setLevel(logging.ERROR)
class URLFile:
"""Class to manage read from file located at different providers
Supported examples of URL this class can accept are as follows:
```
s3://my_bucket/my_key
s3://my_key:my_secret@my_bucket/my_key
gs://my_bucket/my_blob
hdfs:///path/file (not tested)
hdfs://path/file (not tested)
webhdfs://host:port/path/file (not tested)
./local/path/file
~/local/path/file
local/path/file
./local/path/file.gz
file:///home/user/file
file:///home/user/file.bz2
[ssh|scp|sftp]://username@host//path/file
[ssh|scp|sftp]://username@host/path/file
[ssh|scp|sftp]://username:password@host/path/file
```
"""
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
self._url = url
self._provider = provider
self._file = None
self.args = {
"mode": "rb" if binary else "r",
"encoding": None if binary else encoding,
}
def __enter__(self):
return self._file
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def full_url(self):
return f"{self.storage_scheme}{self.url}"
def close(self):
if self._file:
self._file.close()
self._file = None
def backoff_giveup(self, error):
# https://github.com/airbytehq/oncall/issues/1954
if isinstance(error, SSHException) and str(error).startswith("Error reading SSH protocol banner"):
# We need to clear smart_open internal _SSH cache from the previous attempt, otherwise:
# SSHException('SSH session not active')
# will be raised
smart_open.ssh._SSH.clear()
return False
return True
def open(self):
self.close()
_open = backoff.on_exception(backoff.expo, Exception, max_tries=5, giveup=self.backoff_giveup)(self._open)
try:
self._file = _open()
except google.api_core.exceptions.NotFound as err:
raise FileNotFoundError(self.url) from err
return self
def _open(self):
storage = self.storage_scheme
url = self.url
if storage == "gs://":
return self._open_gcs_url()
elif storage == "s3://":
return self._open_aws_url()
elif storage == "azure://":
return self._open_azblob_url()
elif storage == "webhdfs://":
host = self._provider["host"]
port = self._provider["port"]
return smart_open.open(f"webhdfs://{host}:{port}/{url}", **self.args)
elif storage in ("ssh://", "scp://", "sftp://"):
# We need to quote parameters to deal with special characters
# https://bugs.python.org/issue18140
user = urllib.parse.quote(self._provider["user"])
host = urllib.parse.quote(self._provider["host"])
url = urllib.parse.quote(url)
# TODO: Remove int casting when https://github.com/airbytehq/airbyte/issues/4952 is addressed
# TODO: The "port" field in spec.json must also be changed
_port_value = self._provider.get("port", 22)
try:
port = int(_port_value)
except ValueError as err:
raise ValueError(f"{_port_value} is not a valid integer for the port") from err
# Explicitly turn off ssh keys stored in ~/.ssh
transport_params = {"connect_kwargs": {"look_for_keys": False}, "timeout": SSH_TIMEOUT}
if "password" in self._provider:
password = urllib.parse.quote(self._provider["password"])
uri = f"{storage}{user}:{password}@{host}:{port}/{url}"
else:
uri = f"{storage}{user}@{host}:{port}/{url}"
return smart_open.open(uri, transport_params=transport_params, **self.args)
elif storage in ("https://", "http://"):
transport_params = None
if "user_agent" in self._provider and self._provider["user_agent"]:
airbyte_version = environ.get("AIRBYTE_VERSION", "0.0")
transport_params = {"headers": {"Accept-Encoding": "identity", "User-Agent": f"Airbyte/{airbyte_version}"}}
logger.info(f"TransportParams: {transport_params}")
return smart_open.open(self.full_url, transport_params=transport_params, **self.args)
return smart_open.open(self.full_url, **self.args)
@property
def url(self) -> str:
"""Convert URL to remove the URL prefix (scheme)
:return: the corresponding URL without URL prefix / scheme
"""
parse_result = urlparse(self._url)
if parse_result.scheme:
return self._url.split("://")[-1]
else:
return self._url
@property
def storage_scheme(self) -> str:
"""Convert Storage Names to the proper URL Prefix
:return: the corresponding URL prefix / scheme
"""
storage_name = self._provider["storage"].upper()
parse_result = urlparse(self._url)
if storage_name == "GCS":
return "gs://"
elif storage_name == "S3":
return "s3://"
elif storage_name == "AZBLOB":
return "azure://"
elif storage_name == "HTTPS":
return "https://"
elif storage_name == "SSH" or storage_name == "SCP":
return "scp://"
elif storage_name == "SFTP":
return "sftp://"
elif storage_name == "WEBHDFS":
return "webhdfs://"
elif storage_name == "LOCAL":
return "file://"
elif parse_result.scheme:
return parse_result.scheme
logger.error(f"Unknown Storage provider in: {self._url}")
return ""
def _open_gcs_url(self) -> object:
service_account_json = self._provider.get("service_account_json")
credentials = None
if service_account_json:
try:
credentials = json.loads(self._provider["service_account_json"])
except json.decoder.JSONDecodeError as err:
error_msg = f"Failed to parse gcs service account json: {repr(err)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
if credentials:
credentials = service_account.Credentials.from_service_account_info(credentials)
client = GCSClient(credentials=credentials, project=credentials._project_id)
else:
client = GCSClient.create_anonymous_client()
file_to_close = smart_open.open(self.full_url, transport_params={"client": client}, **self.args)
return file_to_close
def _open_aws_url(self):
aws_access_key_id = self._provider.get("aws_access_key_id")
aws_secret_access_key = self._provider.get("aws_secret_access_key")
use_aws_account = aws_access_key_id and aws_secret_access_key
if use_aws_account:
aws_access_key_id = self._provider.get("aws_access_key_id", "")
aws_secret_access_key = self._provider.get("aws_secret_access_key", "")
url = f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}"
result = smart_open.open(url, **self.args)
else:
config = botocore.client.Config(signature_version=botocore.UNSIGNED)
params = {"client": boto3.client("s3", config=config)}
result = smart_open.open(self.full_url, transport_params=params, **self.args)
return result
def _open_azblob_url(self):
storage_account = self._provider.get("storage_account")
storage_acc_url = f"https://{storage_account}.blob.core.windows.net"
sas_token = self._provider.get("sas_token", None)
shared_key = self._provider.get("shared_key", None)
# if both keys are provided, shared_key is preferred as has permissions on entire storage account
credential = shared_key or sas_token
if credential:
client = BlobServiceClient(account_url=storage_acc_url, credential=credential)
else:
# assuming anonymous public read access given no credential
client = BlobServiceClient(account_url=storage_acc_url)
url = f"{self.storage_scheme}{self.url}"
return smart_open.open(url, transport_params=dict(client=client), **self.args)
class Client:
"""Class that manages reading and parsing data from streams"""
CSV_CHUNK_SIZE = 10_000
binary_formats = {"excel", "excel_binary", "feather", "parquet", "orc", "pickle"}
def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: dict = None):
self._dataset_name = dataset_name
self._url = url
self._provider = provider
self._reader_format = format or "csv"
self._reader_options = reader_options or {}
self._is_zip = url.lower().endswith(".zip")
self.binary_source = self._reader_format in self.binary_formats or self._is_zip
self.encoding = self._reader_options.get("encoding")
@property
def reader_class(self):
if is_cloud_environment():
return URLFileSecure
return URLFile
@property
def stream_name(self) -> str:
if self._dataset_name:
return self._dataset_name
return f"file_{self._provider['storage']}.{self._reader_format}"
def load_nested_json_schema(self, fp) -> dict:
# Use Genson Library to take JSON objects and generate schemas that describe them,
builder = SchemaBuilder()
if self._reader_format == "jsonl":
for o in self.read():
builder.add_object(o)
else:
builder.add_object(json.load(fp))
result = builder.to_schema()
if "items" in result:
# this means we have a json list e.g. [{...}, {...}]
# but need to emit schema of an inside dict
result = result["items"]
result["$schema"] = "http://json-schema.org/draft-07/schema#"
return result
def load_nested_json(self, fp) -> list:
if self._reader_format == "jsonl":
result = []
line = fp.readline()
while line:
result.append(json.loads(line))
line = fp.readline()
else:
result = json.load(fp)
if not isinstance(result, list):
result = [result]
return result
def load_yaml(self, fp):
if self._reader_format == "yaml":
return pd.DataFrame(safe_load(fp))
def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False) -> Iterable:
"""load and return the appropriate pandas dataframe.
:param fp: file-like object to read from
:param skip_data: limit reading data
:param read_sample_chunk: indicates whether a single chunk should only be read to generate schema
:return: a list of dataframe loaded from files described in the configuration
"""
readers = {
# pandas.read_csv additional arguments can be passed to customize how to parse csv.
# see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
"csv": pd.read_csv,
# We can add option to call to pd.normalize_json to normalize semi-structured JSON data into a flat table
# by asking user to specify how to flatten the nested columns
"flat_json": pd.read_json,
"html": pd.read_html,
"excel": pd.read_excel,
"excel_binary": pd.read_excel,
"fwf": pd.read_fwf,
"feather": pd.read_feather,
"parquet": pd.read_parquet,
"orc": pd.read_orc,
"pickle": pd.read_pickle,
}
try:
reader = readers[self._reader_format]
except KeyError as err:
error_msg = f"Reader {self._reader_format} is not supported."
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
reader_options = {**self._reader_options}
try:
if self._reader_format == "csv":
bytes_read = 0
reader_options["chunksize"] = self.CSV_CHUNK_SIZE
if skip_data:
reader_options["nrows"] = 0
reader_options["index_col"] = 0
for record in reader(fp, **reader_options):
bytes_read += sys.getsizeof(record)
yield record
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
return
elif self._reader_format == "excel_binary":
reader_options["engine"] = "pyxlsb"
yield reader(fp, **reader_options)
elif self._reader_format == "parquet":
reader_options["engine"] = "fastparquet"
yield reader(fp, **reader_options)
elif self._reader_format == "excel":
try:
for df_chunk in self.openpyxl_chunk_reader(fp, **reader_options):
yield df_chunk
except (InvalidFileException, BadZipFile):
yield pd.read_excel(fp, **reader_options)
else:
yield reader(fp, **reader_options)
except ParserError as err:
error_msg = f"File {fp} can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
except UnicodeDecodeError as err:
error_msg = (
f"File {fp} can't be parsed with reader of chosen type ({self._reader_format}). "
f"Please check provided Format and Reader Options. {repr(err)}."
)
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
@staticmethod
def dtype_to_json_type(current_type: str, dtype) -> str:
"""Convert Pandas Dataframe types to Airbyte Types.
:param current_type: str - one of the following types based on previous dataframes
:param dtype: Pandas Dataframe type
:return: Corresponding Airbyte Type
"""
number_types = ("int64", "float64")
if current_type == "string":
# previous column values was of the string type, no sense to look further
return current_type
if dtype == object:
return "string"
if dtype in number_types and (not current_type or current_type == "number"):
return "number"
if dtype == "bool" and (not current_type or current_type == "boolean"):
return "boolean"
if dtype == "datetime64[ns]":
return "date-time"
return "string"
@property
def reader(self) -> reader_class:
return self.reader_class(url=self._url, provider=self._provider, binary=self.binary_source, encoding=self.encoding)
@backoff.on_exception(backoff.expo, ConnectionResetError, on_backoff=backoff_handler, max_tries=5, max_time=60)
def read(self, fields: Iterable = None) -> Iterable[dict]:
"""Read data from the stream"""
with self.reader.open() as fp:
try:
if self._reader_format in ["json", "jsonl"]:
yield from self.load_nested_json(fp)
elif self._reader_format == "yaml":
fields = set(fields) if fields else None
df = self.load_yaml(fp)
df.columns = df.columns.astype(str)
columns = fields.intersection(set(df.columns)) if fields else df.columns
df = df.where(pd.notnull(df), None)
yield from df[list(columns)].to_dict(orient="records")
else:
fields = set(fields) if fields else None
if self.binary_source:
fp = self._cache_stream(fp)
if self._is_zip:
fp = self._unzip(fp)
for df in self.load_dataframes(fp):
df.columns = df.columns.astype(str)
columns = fields.intersection(set(df.columns)) if fields else df.columns
df.replace({np.nan: None}, inplace=True)
yield from df[list(columns)].to_dict(orient="records")
except ConnectionResetError:
logger.info(f"Catched `connection reset error - 104`, stream: {self.stream_name} ({self.reader.full_url})")
raise ConnectionResetError
except ProtocolError as err:
error_msg = (
f"File {fp} can not be opened due to connection issues on provider side. Please check provided links and options"
)
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
def _unzip(self, fp):
tmp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(str(fp.name), "r") as zip_ref:
zip_ref.extractall(tmp_dir.name)
logger.info("Temp dir content: " + str(os.listdir(tmp_dir.name)))
final_file: str = os.path.join(tmp_dir.name, os.listdir(tmp_dir.name)[0])
logger.info("Pick up first file: " + final_file)
fp_tmp = open(final_file, "rb")
return fp_tmp
def _cache_stream(self, fp):
"""cache stream to file"""
fp_tmp = tempfile.NamedTemporaryFile(mode="w+b")
fp_tmp.write(fp.read())
fp_tmp.seek(0)
fp.close()
return fp_tmp
def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk: bool = False):
"""
empty_schema param is used to check connectivity, i.e. we only read a header and do not produce stream properties
read_sample_chunk is used to determine if just one chunk should be read to generate schema
"""
if self._reader_format == "yaml":
df_list = [self.load_yaml(fp)]
else:
if self.binary_source:
fp = self._cache_stream(fp)
if self._is_zip:
fp = self._unzip(fp)
df_list = self.load_dataframes(fp, skip_data=empty_schema, read_sample_chunk=read_sample_chunk)
fields = {}
for df in df_list:
for col in df.columns:
# if data type of the same column differs in dataframes, we choose the broadest one
prev_frame_column_type = fields.get(col)
df_type = df[col].dtype
fields[col] = self.dtype_to_json_type(prev_frame_column_type, df_type)
return {
field: (
{"type": ["string", "null"], "format": "date-time"} if fields[field] == "date-time" else {"type": [fields[field], "null"]}
)
for field in fields
}
def streams(self, empty_schema: bool = False) -> Iterable:
"""Discovers available streams"""
# TODO handle discovery of directories of multiple files instead
with self.reader.open() as fp:
if self._reader_format in ["json", "jsonl"]:
json_schema = self.load_nested_json_schema(fp)
else:
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": self._stream_properties(fp, empty_schema=empty_schema, read_sample_chunk=True),
}
yield AirbyteStream(name=self.stream_name, json_schema=json_schema, supported_sync_modes=[SyncMode.full_refresh])
def openpyxl_chunk_reader(self, file, **kwargs):
"""
Use openpyxl's lazy loading feature to read Excel files (xlsx only) in chunks of 500 lines at a time.
"""
# Retrieve reader options
header = kwargs.get("header", 0)
skiprows = kwargs.get("skiprows", 0)
user_provided_column_names = kwargs.get("names")
chunk_size = 500
# Load workbook with data-only to avoid loading formulas
work_book = load_workbook(filename=file, data_only=True, read_only=True)
for sheetname in work_book.sheetnames:
work_sheet = work_book[sheetname]
data = list(work_sheet.iter_rows(values_only=True))
# Skip rows as specified
data = data[skiprows:]
if len(data) == 0:
raise AirbyteTracedException(
message="File does not contain enough rows to process.",
internal_message=f"Sheet {sheetname} contains no data after applying header and skiprows.",
failure_type=FailureType.config_error,
)
# Determine column names
if user_provided_column_names:
column_names = user_provided_column_names
elif header is not None:
if len(data) <= header:
raise AirbyteTracedException(
message="File does not contain enough rows to extract headers.",
internal_message=f"Sheet {sheetname} does not have enough rows for the specified header {header}.",
failure_type=FailureType.config_error,
)
column_names = data[header] # Extract the header row
data = data[header + 1 :] # Remove the header row and rows above it
else:
raise AirbyteTracedException(
message="Unable to determine column names. Please provide valid reader options.",
internal_message="No header or column names specified.",
failure_type=FailureType.config_error,
)
if column_names is None or len(column_names) == 0:
raise AirbyteTracedException(
message="Column names could not be determined.",
internal_message="Column names are empty or invalid.",
failure_type=FailureType.config_error,
)
if len(data) == 0:
raise AirbyteTracedException(
message="File does not contain any data rows.",
internal_message=f"Sheet {sheetname} contains no data rows after applying header and skiprows.",
failure_type=FailureType.config_error,
)
chunk = []
for row in data:
chunk.append(dict(zip(column_names, row)))
if len(chunk) == chunk_size:
yield pd.DataFrame(chunk)
chunk = []
if chunk:
yield pd.DataFrame(chunk)
class URLFileSecure(URLFile):
"""Updating of default logic:
This connector shouldn't work with local files.
"""
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
storage_name = provider["storage"].lower()
if url.startswith("file://") or storage_name == LOCAL_STORAGE_NAME:
raise RuntimeError("the local file storage is not supported by this connector.")
super().__init__(url, provider, binary, encoding)