1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-file/source_file/client.py
2021-05-13 17:46:34 -07:00

358 lines
13 KiB
Python

#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import json
import traceback
from typing import Iterable
from urllib.parse import urlparse
import google
import numpy as np
import pandas as pd
import smart_open
from airbyte_protocol import AirbyteStream
from base_python.entrypoint import logger
from botocore import UNSIGNED
from botocore.config import Config
from genson import SchemaBuilder
from google.cloud.storage import Client as GCSClient
from google.oauth2 import service_account
class ConfigurationError(Exception):
"""Client mis-configured"""
class PermissionsError(Exception):
"""User don't have enough permissions"""
class URLFile:
"""Class to manage read from file located at different providers
Supported examples of URL this class can accept are as follows:
```
s3://my_bucket/my_key
s3://my_key:my_secret@my_bucket/my_key
gs://my_bucket/my_blob
hdfs:///path/file (not tested)
hdfs://path/file (not tested)
webhdfs://host:port/path/file (not tested)
./local/path/file
~/local/path/file
local/path/file
./local/path/file.gz
file:///home/user/file
file:///home/user/file.bz2
[ssh|scp|sftp]://username@host//path/file
[ssh|scp|sftp]://username@host/path/file
[ssh|scp|sftp]://username:password@host/path/file
```
"""
def __init__(self, url: str, provider: dict):
self._url = url
self._provider = provider
self._file = None
def __enter__(self):
return self._file
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def full_url(self):
return f"{self.storage_scheme}{self.url}"
def close(self):
if self._file:
self._file.close()
self._file = None
def open(self, binary=False):
self.close()
try:
self._file = self._open(binary=binary)
except google.api_core.exceptions.NotFound as err:
raise FileNotFoundError(self.url) from err
return self
def _open(self, binary):
mode = "rb" if binary else "r"
storage = self.storage_scheme
url = self.url
if storage == "gs://":
return self._open_gcs_url(binary=binary)
elif storage == "s3://":
return self._open_aws_url(binary=binary)
elif storage == "webhdfs://":
host = self._provider["host"]
port = self._provider["port"]
return smart_open.open(f"webhdfs://{host}:{port}/{url}", mode=mode)
elif storage in ("ssh://", "scp://", "sftp://"):
user = self._provider["user"]
host = self._provider["host"]
port = self._provider.get("port", 22)
# Explicitly turn off ssh keys stored in ~/.ssh
transport_params = {"connect_kwargs": {"look_for_keys": False}}
if "password" in self._provider:
password = self._provider["password"]
uri = f"{storage}{user}:{password}@{host}:{port}/{url}"
else:
uri = f"{storage}{user}@{host}:{port}/{url}"
return smart_open.open(uri, transport_params=transport_params, mode=mode)
return smart_open.open(self.full_url, mode=mode)
@property
def url(self) -> str:
"""Convert URL to remove the URL prefix (scheme)
:return: the corresponding URL without URL prefix / scheme
"""
parse_result = urlparse(self._url)
if parse_result.scheme:
return self._url.split("://")[-1]
else:
return self._url
@property
def storage_scheme(self) -> str:
"""Convert Storage Names to the proper URL Prefix
:return: the corresponding URL prefix / scheme
"""
storage_name = self._provider["storage"].upper()
parse_result = urlparse(self._url)
if storage_name == "GCS":
return "gs://"
elif storage_name == "S3":
return "s3://"
elif storage_name == "HTTPS":
return "https://"
elif storage_name == "SSH" or storage_name == "SCP":
return "scp://"
elif storage_name == "SFTP":
return "sftp://"
elif storage_name == "WEBHDFS":
return "webhdfs://"
elif storage_name == "LOCAL":
return "file://"
elif parse_result.scheme:
return parse_result.scheme
logger.error(f"Unknown Storage provider in: {self.full_url}")
return ""
def _open_gcs_url(self, binary) -> object:
mode = "rb" if binary else "r"
service_account_json = self._provider.get("service_account_json")
credentials = None
if service_account_json:
try:
credentials = json.loads(self._provider["service_account_json"])
except json.decoder.JSONDecodeError as err:
error_msg = f"Failed to parse gcs service account json: {repr(err)}\n{traceback.format_exc()}"
logger.error(error_msg)
raise ConfigurationError(error_msg) from err
if credentials:
credentials = service_account.Credentials.from_service_account_info(credentials)
client = GCSClient(credentials=credentials, project=credentials._project_id)
else:
client = GCSClient.create_anonymous_client()
file_to_close = smart_open.open(self.full_url, transport_params=dict(client=client), mode=mode)
return file_to_close
def _open_aws_url(self, binary):
mode = "rb" if binary else "r"
aws_access_key_id = self._provider.get("aws_access_key_id")
aws_secret_access_key = self._provider.get("aws_secret_access_key")
use_aws_account = aws_access_key_id and aws_secret_access_key
if use_aws_account:
aws_access_key_id = self._provider.get("aws_access_key_id", "")
aws_secret_access_key = self._provider.get("aws_secret_access_key", "")
result = smart_open.open(f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}", mode=mode)
else:
config = Config(signature_version=UNSIGNED)
params = {
"resource_kwargs": {"config": config},
}
result = smart_open.open(self.full_url, transport_params=params, mode=mode)
return result
class Client:
"""Class that manages reading and parsing data from streams"""
reader_class = URLFile
def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None):
self._dataset_name = dataset_name
self._url = url
self._provider = provider
self._reader_format = format or "csv"
self._reader_options = {}
if reader_options:
try:
self._reader_options = json.loads(reader_options)
except json.decoder.JSONDecodeError as err:
error_msg = f"Failed to parse reader options {repr(err)}\n{reader_options}\n{traceback.format_exc()}"
logger.error(error_msg)
raise ConfigurationError(error_msg) from err
@property
def stream_name(self) -> str:
if self._dataset_name:
return self._dataset_name
return f"file_{self._provider['storage']}.{self._reader_format}"
def load_nested_json_schema(self, fp) -> dict:
# Use Genson Library to take JSON objects and generate schemas that describe them,
builder = SchemaBuilder()
if self._reader_format == "jsonl":
for o in self.read():
builder.add_object(o)
else:
builder.add_object(json.load(fp))
result = builder.to_schema()
if "items" in result and "properties" in result["items"]:
result = result["items"]["properties"]
return result
def load_nested_json(self, fp) -> list:
if self._reader_format == "jsonl":
result = []
line = fp.readline()
while line:
result.append(json.loads(line))
line = fp.readline()
else:
result = json.load(fp)
if not isinstance(result, list):
result = [result]
return result
def load_dataframes(self, fp, skip_data=False) -> Iterable:
"""load and return the appropriate pandas dataframe.
:param fp: file-like object to read from
:param skip_data: limit reading data
:return: a list of dataframe loaded from files described in the configuration
"""
readers = {
# pandas.read_csv additional arguments can be passed to customize how to parse csv.
# see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
"csv": pd.read_csv,
# We can add option to call to pd.normalize_json to normalize semi-structured JSON data into a flat table
# by asking user to specify how to flatten the nested columns
"flat_json": pd.read_json,
"html": pd.read_html,
"excel": pd.read_excel,
"feather": pd.read_feather,
"parquet": pd.read_parquet,
"orc": pd.read_orc,
"pickle": pd.read_pickle,
}
try:
reader = readers[self._reader_format]
except KeyError as err:
error_msg = f"Reader {self._reader_format} is not supported\n{traceback.format_exc()}"
logger.error(error_msg)
raise ConfigurationError(error_msg) from err
reader_options = {**self._reader_options}
if self._reader_format == "csv":
reader_options["chunksize"] = 10000
if skip_data:
reader_options["nrows"] = 0
reader_options["index_col"] = 0
yield from reader(fp, **reader_options)
else:
yield reader(fp, **reader_options)
@staticmethod
def dtype_to_json_type(dtype) -> str:
"""Convert Pandas Dataframe types to Airbyte Types.
:param dtype: Pandas Dataframe type
:return: Corresponding Airbyte Type
"""
if dtype == object:
return "string"
elif dtype in ("int64", "float64"):
return "number"
elif dtype == "bool":
return "boolean"
return "string"
@property
def reader(self) -> reader_class:
return self.reader_class(url=self._url, provider=self._provider)
@property
def binary_source(self):
binary_formats = {"excel", "feather", "parquet", "orc", "pickle"}
return self._reader_format in binary_formats
def read(self, fields: Iterable = None) -> Iterable[dict]:
"""Read data from the stream"""
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json" or self._reader_format == "jsonl":
yield from self.load_nested_json(fp)
else:
fields = set(fields) if fields else None
for df in self.load_dataframes(fp):
columns = fields.intersection(set(df.columns)) if fields else df.columns
df = df.replace(np.nan, "NaN", regex=True)
yield from df[columns].to_dict(orient="records")
def _stream_properties(self):
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json" or self._reader_format == "jsonl":
return self.load_nested_json_schema(fp)
df_list = self.load_dataframes(fp, skip_data=False)
fields = {}
for df in df_list:
for col in df.columns:
fields[col] = self.dtype_to_json_type(df[col].dtype)
return {field: {"type": fields[field]} for field in fields}
@property
def streams(self) -> Iterable:
"""Discovers available streams"""
# TODO handle discovery of directories of multiple files instead
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": self._stream_properties(),
}
yield AirbyteStream(name=self.stream_name, json_schema=json_schema)