Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
592 lines
25 KiB
Python
592 lines
25 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import traceback
|
|
import urllib
|
|
import zipfile
|
|
from os import environ
|
|
from typing import Iterable
|
|
from urllib.parse import urlparse
|
|
from zipfile import BadZipFile
|
|
|
|
import backoff
|
|
import boto3
|
|
import botocore
|
|
import google
|
|
import numpy as np
|
|
import pandas as pd
|
|
import smart_open
|
|
import smart_open.ssh
|
|
from azure.storage.blob import BlobServiceClient
|
|
from genson import SchemaBuilder
|
|
from google.cloud.storage import Client as GCSClient
|
|
from google.oauth2 import service_account
|
|
from openpyxl import load_workbook
|
|
from openpyxl.utils.exceptions import InvalidFileException
|
|
from pandas.errors import ParserError
|
|
from paramiko import SSHException
|
|
from urllib3.exceptions import ProtocolError
|
|
from yaml import safe_load
|
|
|
|
from airbyte_cdk.entrypoint import logger
|
|
from airbyte_cdk.models import AirbyteStream, FailureType, SyncMode
|
|
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
|
|
|
|
from .utils import LOCAL_STORAGE_NAME, backoff_handler
|
|
|
|
|
|
SSH_TIMEOUT = 60
|
|
|
|
# Force the log level of the smart-open logger to ERROR - https://github.com/airbytehq/airbyte/pull/27157
|
|
logging.getLogger("smart_open").setLevel(logging.ERROR)
|
|
|
|
|
|
class URLFile:
|
|
"""Class to manage read from file located at different providers
|
|
|
|
Supported examples of URL this class can accept are as follows:
|
|
```
|
|
s3://my_bucket/my_key
|
|
s3://my_key:my_secret@my_bucket/my_key
|
|
gs://my_bucket/my_blob
|
|
hdfs:///path/file (not tested)
|
|
hdfs://path/file (not tested)
|
|
webhdfs://host:port/path/file (not tested)
|
|
./local/path/file
|
|
~/local/path/file
|
|
local/path/file
|
|
./local/path/file.gz
|
|
file:///home/user/file
|
|
file:///home/user/file.bz2
|
|
[ssh|scp|sftp]://username@host//path/file
|
|
[ssh|scp|sftp]://username@host/path/file
|
|
[ssh|scp|sftp]://username:password@host/path/file
|
|
```
|
|
"""
|
|
|
|
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
|
|
self._url = url
|
|
self._provider = provider
|
|
self._file = None
|
|
self.args = {
|
|
"mode": "rb" if binary else "r",
|
|
"encoding": None if binary else encoding,
|
|
}
|
|
|
|
def __enter__(self):
|
|
return self._file
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
@property
|
|
def full_url(self):
|
|
return f"{self.storage_scheme}{self.url}"
|
|
|
|
def close(self):
|
|
if self._file:
|
|
self._file.close()
|
|
self._file = None
|
|
|
|
def backoff_giveup(self, error):
|
|
# https://github.com/airbytehq/oncall/issues/1954
|
|
if isinstance(error, SSHException) and str(error).startswith("Error reading SSH protocol banner"):
|
|
# We need to clear smart_open internal _SSH cache from the previous attempt, otherwise:
|
|
# SSHException('SSH session not active')
|
|
# will be raised
|
|
smart_open.ssh._SSH.clear()
|
|
return False
|
|
return True
|
|
|
|
def open(self):
|
|
self.close()
|
|
_open = backoff.on_exception(backoff.expo, Exception, max_tries=5, giveup=self.backoff_giveup)(self._open)
|
|
try:
|
|
self._file = _open()
|
|
except google.api_core.exceptions.NotFound as err:
|
|
raise FileNotFoundError(self.url) from err
|
|
return self
|
|
|
|
def _open(self):
|
|
storage = self.storage_scheme
|
|
url = self.url
|
|
|
|
if storage == "gs://":
|
|
return self._open_gcs_url()
|
|
elif storage == "s3://":
|
|
return self._open_aws_url()
|
|
elif storage == "azure://":
|
|
return self._open_azblob_url()
|
|
elif storage == "webhdfs://":
|
|
host = self._provider["host"]
|
|
port = self._provider["port"]
|
|
return smart_open.open(f"webhdfs://{host}:{port}/{url}", **self.args)
|
|
elif storage in ("ssh://", "scp://", "sftp://"):
|
|
# We need to quote parameters to deal with special characters
|
|
# https://bugs.python.org/issue18140
|
|
user = urllib.parse.quote(self._provider["user"])
|
|
host = urllib.parse.quote(self._provider["host"])
|
|
url = urllib.parse.quote(url)
|
|
# TODO: Remove int casting when https://github.com/airbytehq/airbyte/issues/4952 is addressed
|
|
# TODO: The "port" field in spec.json must also be changed
|
|
_port_value = self._provider.get("port", 22)
|
|
try:
|
|
port = int(_port_value)
|
|
except ValueError as err:
|
|
raise ValueError(f"{_port_value} is not a valid integer for the port") from err
|
|
# Explicitly turn off ssh keys stored in ~/.ssh
|
|
transport_params = {"connect_kwargs": {"look_for_keys": False}, "timeout": SSH_TIMEOUT}
|
|
if "password" in self._provider:
|
|
password = urllib.parse.quote(self._provider["password"])
|
|
uri = f"{storage}{user}:{password}@{host}:{port}/{url}"
|
|
else:
|
|
uri = f"{storage}{user}@{host}:{port}/{url}"
|
|
return smart_open.open(uri, transport_params=transport_params, **self.args)
|
|
elif storage in ("https://", "http://"):
|
|
transport_params = None
|
|
if "user_agent" in self._provider and self._provider["user_agent"]:
|
|
airbyte_version = environ.get("AIRBYTE_VERSION", "0.0")
|
|
transport_params = {"headers": {"Accept-Encoding": "identity", "User-Agent": f"Airbyte/{airbyte_version}"}}
|
|
logger.info(f"TransportParams: {transport_params}")
|
|
return smart_open.open(self.full_url, transport_params=transport_params, **self.args)
|
|
return smart_open.open(self.full_url, **self.args)
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
"""Convert URL to remove the URL prefix (scheme)
|
|
:return: the corresponding URL without URL prefix / scheme
|
|
"""
|
|
parse_result = urlparse(self._url)
|
|
if parse_result.scheme:
|
|
return self._url.split("://")[-1]
|
|
else:
|
|
return self._url
|
|
|
|
@property
|
|
def storage_scheme(self) -> str:
|
|
"""Convert Storage Names to the proper URL Prefix
|
|
:return: the corresponding URL prefix / scheme
|
|
"""
|
|
storage_name = self._provider["storage"].upper()
|
|
parse_result = urlparse(self._url)
|
|
|
|
if storage_name == "GCS":
|
|
return "gs://"
|
|
elif storage_name == "S3":
|
|
return "s3://"
|
|
elif storage_name == "AZBLOB":
|
|
return "azure://"
|
|
elif storage_name == "HTTPS":
|
|
return "https://"
|
|
elif storage_name == "SSH" or storage_name == "SCP":
|
|
return "scp://"
|
|
elif storage_name == "SFTP":
|
|
return "sftp://"
|
|
elif storage_name == "WEBHDFS":
|
|
return "webhdfs://"
|
|
elif storage_name == "LOCAL":
|
|
return "file://"
|
|
elif parse_result.scheme:
|
|
return parse_result.scheme
|
|
|
|
logger.error(f"Unknown Storage provider in: {self._url}")
|
|
return ""
|
|
|
|
def _open_gcs_url(self) -> object:
|
|
service_account_json = self._provider.get("service_account_json")
|
|
credentials = None
|
|
if service_account_json:
|
|
try:
|
|
credentials = json.loads(self._provider["service_account_json"])
|
|
except json.decoder.JSONDecodeError as err:
|
|
error_msg = f"Failed to parse gcs service account json: {repr(err)}"
|
|
logger.error(f"{error_msg}\n{traceback.format_exc()}")
|
|
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
|
|
|
|
if credentials:
|
|
credentials = service_account.Credentials.from_service_account_info(credentials)
|
|
client = GCSClient(credentials=credentials, project=credentials._project_id)
|
|
else:
|
|
client = GCSClient.create_anonymous_client()
|
|
file_to_close = smart_open.open(self.full_url, transport_params={"client": client}, **self.args)
|
|
|
|
return file_to_close
|
|
|
|
def _open_aws_url(self):
|
|
aws_access_key_id = self._provider.get("aws_access_key_id")
|
|
aws_secret_access_key = self._provider.get("aws_secret_access_key")
|
|
use_aws_account = aws_access_key_id and aws_secret_access_key
|
|
|
|
if use_aws_account:
|
|
aws_access_key_id = self._provider.get("aws_access_key_id", "")
|
|
aws_secret_access_key = self._provider.get("aws_secret_access_key", "")
|
|
url = f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}"
|
|
result = smart_open.open(url, **self.args)
|
|
else:
|
|
config = botocore.client.Config(signature_version=botocore.UNSIGNED)
|
|
params = {"client": boto3.client("s3", config=config)}
|
|
result = smart_open.open(self.full_url, transport_params=params, **self.args)
|
|
return result
|
|
|
|
def _open_azblob_url(self):
|
|
storage_account = self._provider.get("storage_account")
|
|
storage_acc_url = f"https://{storage_account}.blob.core.windows.net"
|
|
sas_token = self._provider.get("sas_token", None)
|
|
shared_key = self._provider.get("shared_key", None)
|
|
# if both keys are provided, shared_key is preferred as has permissions on entire storage account
|
|
credential = shared_key or sas_token
|
|
|
|
if credential:
|
|
client = BlobServiceClient(account_url=storage_acc_url, credential=credential)
|
|
else:
|
|
# assuming anonymous public read access given no credential
|
|
client = BlobServiceClient(account_url=storage_acc_url)
|
|
|
|
url = f"{self.storage_scheme}{self.url}"
|
|
return smart_open.open(url, transport_params=dict(client=client), **self.args)
|
|
|
|
|
|
class Client:
|
|
"""Class that manages reading and parsing data from streams"""
|
|
|
|
CSV_CHUNK_SIZE = 10_000
|
|
binary_formats = {"excel", "excel_binary", "feather", "parquet", "orc", "pickle"}
|
|
|
|
def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: dict = None):
|
|
self._dataset_name = dataset_name
|
|
self._url = url
|
|
self._provider = provider
|
|
self._reader_format = format or "csv"
|
|
self._reader_options = reader_options or {}
|
|
self._is_zip = url.lower().endswith(".zip")
|
|
self.binary_source = self._reader_format in self.binary_formats or self._is_zip
|
|
self.encoding = self._reader_options.get("encoding")
|
|
|
|
@property
|
|
def reader_class(self):
|
|
if is_cloud_environment():
|
|
return URLFileSecure
|
|
|
|
return URLFile
|
|
|
|
@property
|
|
def stream_name(self) -> str:
|
|
if self._dataset_name:
|
|
return self._dataset_name
|
|
return f"file_{self._provider['storage']}.{self._reader_format}"
|
|
|
|
def load_nested_json_schema(self, fp) -> dict:
|
|
# Use Genson Library to take JSON objects and generate schemas that describe them,
|
|
builder = SchemaBuilder()
|
|
if self._reader_format == "jsonl":
|
|
for o in self.read():
|
|
builder.add_object(o)
|
|
else:
|
|
builder.add_object(json.load(fp))
|
|
|
|
result = builder.to_schema()
|
|
if "items" in result:
|
|
# this means we have a json list e.g. [{...}, {...}]
|
|
# but need to emit schema of an inside dict
|
|
result = result["items"]
|
|
result["$schema"] = "http://json-schema.org/draft-07/schema#"
|
|
return result
|
|
|
|
def load_nested_json(self, fp) -> list:
|
|
if self._reader_format == "jsonl":
|
|
result = []
|
|
line = fp.readline()
|
|
while line:
|
|
result.append(json.loads(line))
|
|
line = fp.readline()
|
|
else:
|
|
result = json.load(fp)
|
|
if not isinstance(result, list):
|
|
result = [result]
|
|
return result
|
|
|
|
def load_yaml(self, fp):
|
|
if self._reader_format == "yaml":
|
|
return pd.DataFrame(safe_load(fp))
|
|
|
|
def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False) -> Iterable:
|
|
"""load and return the appropriate pandas dataframe.
|
|
|
|
:param fp: file-like object to read from
|
|
:param skip_data: limit reading data
|
|
:param read_sample_chunk: indicates whether a single chunk should only be read to generate schema
|
|
:return: a list of dataframe loaded from files described in the configuration
|
|
"""
|
|
readers = {
|
|
# pandas.read_csv additional arguments can be passed to customize how to parse csv.
|
|
# see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
|
|
"csv": pd.read_csv,
|
|
# We can add option to call to pd.normalize_json to normalize semi-structured JSON data into a flat table
|
|
# by asking user to specify how to flatten the nested columns
|
|
"flat_json": pd.read_json,
|
|
"html": pd.read_html,
|
|
"excel": pd.read_excel,
|
|
"excel_binary": pd.read_excel,
|
|
"fwf": pd.read_fwf,
|
|
"feather": pd.read_feather,
|
|
"parquet": pd.read_parquet,
|
|
"orc": pd.read_orc,
|
|
"pickle": pd.read_pickle,
|
|
}
|
|
|
|
try:
|
|
reader = readers[self._reader_format]
|
|
except KeyError as err:
|
|
error_msg = f"Reader {self._reader_format} is not supported."
|
|
logger.error(f"{error_msg}\n{traceback.format_exc()}")
|
|
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
|
|
|
|
reader_options = {**self._reader_options}
|
|
try:
|
|
if self._reader_format == "csv":
|
|
bytes_read = 0
|
|
reader_options["chunksize"] = self.CSV_CHUNK_SIZE
|
|
if skip_data:
|
|
reader_options["nrows"] = 0
|
|
reader_options["index_col"] = 0
|
|
for record in reader(fp, **reader_options):
|
|
bytes_read += sys.getsizeof(record)
|
|
yield record
|
|
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
|
|
return
|
|
elif self._reader_format == "excel_binary":
|
|
reader_options["engine"] = "pyxlsb"
|
|
yield reader(fp, **reader_options)
|
|
elif self._reader_format == "parquet":
|
|
reader_options["engine"] = "fastparquet"
|
|
yield reader(fp, **reader_options)
|
|
elif self._reader_format == "excel":
|
|
try:
|
|
for df_chunk in self.openpyxl_chunk_reader(fp, **reader_options):
|
|
yield df_chunk
|
|
except (InvalidFileException, BadZipFile):
|
|
yield pd.read_excel(fp, **reader_options)
|
|
else:
|
|
yield reader(fp, **reader_options)
|
|
except ParserError as err:
|
|
error_msg = f"File {fp} can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html"
|
|
logger.error(f"{error_msg}\n{traceback.format_exc()}")
|
|
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
|
|
except UnicodeDecodeError as err:
|
|
error_msg = (
|
|
f"File {fp} can't be parsed with reader of chosen type ({self._reader_format}). "
|
|
f"Please check provided Format and Reader Options. {repr(err)}."
|
|
)
|
|
logger.error(f"{error_msg}\n{traceback.format_exc()}")
|
|
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
|
|
|
|
@staticmethod
|
|
def dtype_to_json_type(current_type: str, dtype) -> str:
|
|
"""Convert Pandas Dataframe types to Airbyte Types.
|
|
|
|
:param current_type: str - one of the following types based on previous dataframes
|
|
:param dtype: Pandas Dataframe type
|
|
:return: Corresponding Airbyte Type
|
|
"""
|
|
number_types = ("int64", "float64")
|
|
if current_type == "string":
|
|
# previous column values was of the string type, no sense to look further
|
|
return current_type
|
|
if dtype == object:
|
|
return "string"
|
|
if dtype in number_types and (not current_type or current_type == "number"):
|
|
return "number"
|
|
if dtype == "bool" and (not current_type or current_type == "boolean"):
|
|
return "boolean"
|
|
if dtype == "datetime64[ns]":
|
|
return "date-time"
|
|
return "string"
|
|
|
|
@property
|
|
def reader(self) -> reader_class:
|
|
return self.reader_class(url=self._url, provider=self._provider, binary=self.binary_source, encoding=self.encoding)
|
|
|
|
@backoff.on_exception(backoff.expo, ConnectionResetError, on_backoff=backoff_handler, max_tries=5, max_time=60)
|
|
def read(self, fields: Iterable = None) -> Iterable[dict]:
|
|
"""Read data from the stream"""
|
|
with self.reader.open() as fp:
|
|
try:
|
|
if self._reader_format in ["json", "jsonl"]:
|
|
yield from self.load_nested_json(fp)
|
|
elif self._reader_format == "yaml":
|
|
fields = set(fields) if fields else None
|
|
df = self.load_yaml(fp)
|
|
df.columns = df.columns.astype(str)
|
|
columns = fields.intersection(set(df.columns)) if fields else df.columns
|
|
df = df.where(pd.notnull(df), None)
|
|
yield from df[list(columns)].to_dict(orient="records")
|
|
else:
|
|
fields = set(fields) if fields else None
|
|
if self.binary_source:
|
|
fp = self._cache_stream(fp)
|
|
if self._is_zip:
|
|
fp = self._unzip(fp)
|
|
for df in self.load_dataframes(fp):
|
|
df.columns = df.columns.astype(str)
|
|
columns = fields.intersection(set(df.columns)) if fields else df.columns
|
|
df.replace({np.nan: None}, inplace=True)
|
|
yield from df[list(columns)].to_dict(orient="records")
|
|
except ConnectionResetError:
|
|
logger.info(f"Catched `connection reset error - 104`, stream: {self.stream_name} ({self.reader.full_url})")
|
|
raise ConnectionResetError
|
|
except ProtocolError as err:
|
|
error_msg = (
|
|
f"File {fp} can not be opened due to connection issues on provider side. Please check provided links and options"
|
|
)
|
|
logger.error(f"{error_msg}\n{traceback.format_exc()}")
|
|
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
|
|
|
|
def _unzip(self, fp):
|
|
tmp_dir = tempfile.TemporaryDirectory()
|
|
with zipfile.ZipFile(str(fp.name), "r") as zip_ref:
|
|
zip_ref.extractall(tmp_dir.name)
|
|
|
|
logger.info("Temp dir content: " + str(os.listdir(tmp_dir.name)))
|
|
final_file: str = os.path.join(tmp_dir.name, os.listdir(tmp_dir.name)[0])
|
|
logger.info("Pick up first file: " + final_file)
|
|
fp_tmp = open(final_file, "rb")
|
|
return fp_tmp
|
|
|
|
def _cache_stream(self, fp):
|
|
"""cache stream to file"""
|
|
fp_tmp = tempfile.NamedTemporaryFile(mode="w+b")
|
|
fp_tmp.write(fp.read())
|
|
fp_tmp.seek(0)
|
|
fp.close()
|
|
return fp_tmp
|
|
|
|
def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk: bool = False):
|
|
"""
|
|
empty_schema param is used to check connectivity, i.e. we only read a header and do not produce stream properties
|
|
read_sample_chunk is used to determine if just one chunk should be read to generate schema
|
|
"""
|
|
if self._reader_format == "yaml":
|
|
df_list = [self.load_yaml(fp)]
|
|
else:
|
|
if self.binary_source:
|
|
fp = self._cache_stream(fp)
|
|
if self._is_zip:
|
|
fp = self._unzip(fp)
|
|
df_list = self.load_dataframes(fp, skip_data=empty_schema, read_sample_chunk=read_sample_chunk)
|
|
fields = {}
|
|
for df in df_list:
|
|
for col in df.columns:
|
|
# if data type of the same column differs in dataframes, we choose the broadest one
|
|
prev_frame_column_type = fields.get(col)
|
|
df_type = df[col].dtype
|
|
fields[col] = self.dtype_to_json_type(prev_frame_column_type, df_type)
|
|
return {
|
|
field: (
|
|
{"type": ["string", "null"], "format": "date-time"} if fields[field] == "date-time" else {"type": [fields[field], "null"]}
|
|
)
|
|
for field in fields
|
|
}
|
|
|
|
def streams(self, empty_schema: bool = False) -> Iterable:
|
|
"""Discovers available streams"""
|
|
# TODO handle discovery of directories of multiple files instead
|
|
with self.reader.open() as fp:
|
|
if self._reader_format in ["json", "jsonl"]:
|
|
json_schema = self.load_nested_json_schema(fp)
|
|
else:
|
|
json_schema = {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": "object",
|
|
"properties": self._stream_properties(fp, empty_schema=empty_schema, read_sample_chunk=True),
|
|
}
|
|
yield AirbyteStream(name=self.stream_name, json_schema=json_schema, supported_sync_modes=[SyncMode.full_refresh])
|
|
|
|
def openpyxl_chunk_reader(self, file, **kwargs):
|
|
"""
|
|
Use openpyxl's lazy loading feature to read Excel files (xlsx only) in chunks of 500 lines at a time.
|
|
"""
|
|
# Retrieve reader options
|
|
header = kwargs.get("header", 0)
|
|
skiprows = kwargs.get("skiprows", 0)
|
|
user_provided_column_names = kwargs.get("names")
|
|
chunk_size = 500
|
|
|
|
# Load workbook with data-only to avoid loading formulas
|
|
work_book = load_workbook(filename=file, data_only=True, read_only=True)
|
|
|
|
for sheetname in work_book.sheetnames:
|
|
work_sheet = work_book[sheetname]
|
|
data = list(work_sheet.iter_rows(values_only=True))
|
|
|
|
# Skip rows as specified
|
|
data = data[skiprows:]
|
|
|
|
if len(data) == 0:
|
|
raise AirbyteTracedException(
|
|
message="File does not contain enough rows to process.",
|
|
internal_message=f"Sheet {sheetname} contains no data after applying header and skiprows.",
|
|
failure_type=FailureType.config_error,
|
|
)
|
|
|
|
# Determine column names
|
|
if user_provided_column_names:
|
|
column_names = user_provided_column_names
|
|
elif header is not None:
|
|
if len(data) <= header:
|
|
raise AirbyteTracedException(
|
|
message="File does not contain enough rows to extract headers.",
|
|
internal_message=f"Sheet {sheetname} does not have enough rows for the specified header {header}.",
|
|
failure_type=FailureType.config_error,
|
|
)
|
|
column_names = data[header] # Extract the header row
|
|
data = data[header + 1 :] # Remove the header row and rows above it
|
|
else:
|
|
raise AirbyteTracedException(
|
|
message="Unable to determine column names. Please provide valid reader options.",
|
|
internal_message="No header or column names specified.",
|
|
failure_type=FailureType.config_error,
|
|
)
|
|
|
|
if column_names is None or len(column_names) == 0:
|
|
raise AirbyteTracedException(
|
|
message="Column names could not be determined.",
|
|
internal_message="Column names are empty or invalid.",
|
|
failure_type=FailureType.config_error,
|
|
)
|
|
|
|
if len(data) == 0:
|
|
raise AirbyteTracedException(
|
|
message="File does not contain any data rows.",
|
|
internal_message=f"Sheet {sheetname} contains no data rows after applying header and skiprows.",
|
|
failure_type=FailureType.config_error,
|
|
)
|
|
|
|
chunk = []
|
|
for row in data:
|
|
chunk.append(dict(zip(column_names, row)))
|
|
if len(chunk) == chunk_size:
|
|
yield pd.DataFrame(chunk)
|
|
chunk = []
|
|
|
|
if chunk:
|
|
yield pd.DataFrame(chunk)
|
|
|
|
|
|
class URLFileSecure(URLFile):
|
|
"""Updating of default logic:
|
|
This connector shouldn't work with local files.
|
|
"""
|
|
|
|
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
|
|
storage_name = provider["storage"].lower()
|
|
if url.startswith("file://") or storage_name == LOCAL_STORAGE_NAME:
|
|
raise RuntimeError("the local file storage is not supported by this connector.")
|
|
super().__init__(url, provider, binary, encoding)
|