1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Adapt Source File connector to JSON files to test normalization (#872)

Fix spec.json for Source File and prepare JSON loader for nested schema testing
This commit is contained in:
Christophe Duong
2020-11-11 00:33:39 +01:00
committed by GitHub
parent 199a3a16e8
commit 70f0446677
9 changed files with 247 additions and 161 deletions

View File

@@ -55,6 +55,7 @@ jobs:
ADWORDS_INTEGRATION_TEST_CREDS: ${{ secrets.ADWORDS_INTEGRATION_TEST_CREDS }}
FACEBOOK_MARKETING_API_TEST_INTEGRATION_CREDS: ${{ secrets.FACEBOOK_MARKETING_API_TEST_INTEGRATION_CREDS }}
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
- name: Build
run: ./gradlew --no-daemon build --scan

View File

@@ -1,7 +1,10 @@
{
"filename": "integrationTestFile",
"format": "csv",
"reader_options": "{\"sep\": \",\", \"nrows\": 20}",
"storage": "HTTPS",
"url": "https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv",
"reader_impl": "gcsfs"
"provider": {
"storage": "HTTPS",
"reader_impl": "gcsfs"
}
}

View File

@@ -54,7 +54,7 @@ class TestSourceFile(object):
os.remove(tmp_file.name)
print(f"\nLocal File {tmp_file.name} is now deleted")
# @pytest.fixture(scope="class")
@pytest.fixture(scope="class")
def create_gcs_private_data(self, download_gcs_public_data):
storage_client = storage.Client.from_service_account_json(self.service_account_file)
bucket_name = create_unique_gcs_bucket(storage_client, self.cloud_bucket_name)
@@ -66,7 +66,7 @@ class TestSourceFile(object):
bucket.delete(force=True)
print(f"\nGCS Bucket {bucket_name} is now deleted")
# @pytest.fixture(scope="class")
@pytest.fixture(scope="class")
def create_aws_private_data(self, download_gcs_public_data):
with open(self.aws_credentials) as json_file:
aws_config = json.load(json_file)
@@ -112,27 +112,27 @@ class TestSourceFile(object):
config["reader_impl"] = reader_impl
run_load_dataframes(config)
# @pytest.mark.parametrize("reader_impl", ["gcsfs", "smart_open"])
# def test_remote_gcs_load(self, create_gcs_private_data, reader_impl):
# config = get_config()
# config["storage"] = "GCS"
# config["url"] = create_gcs_private_data
# config["reader_impl"] = reader_impl
# with open(self.service_account_file) as json_file:
# config["service_account_json"] = json.dumps(json.load(json_file))
# run_load_dataframes(config)
@pytest.mark.parametrize("reader_impl", ["gcsfs", "smart_open"])
def test_remote_gcs_load(self, create_gcs_private_data, reader_impl):
config = get_config()
config["storage"] = "GCS"
config["url"] = create_gcs_private_data
config["reader_impl"] = reader_impl
with open(self.service_account_file) as json_file:
config["service_account_json"] = json.dumps(json.load(json_file))
run_load_dataframes(config)
# @pytest.mark.parametrize("reader_impl", ["s3fs", "smart_open"])
# def test_remote_aws_load(self, create_aws_private_data, reader_impl):
# config = get_config()
# config["storage"] = "S3"
# config["url"] = create_aws_private_data
# config["reader_impl"] = reader_impl
# with open(self.aws_credentials) as json_file:
# aws_config = json.load(json_file)
# config["aws_access_key_id"] = aws_config["aws_access_key_id"]
# config["aws_secret_access_key"] = aws_config["aws_secret_access_key"]
# run_load_dataframes(config)
@pytest.mark.parametrize("reader_impl", ["s3fs", "smart_open"])
def test_remote_aws_load(self, create_aws_private_data, reader_impl):
config = get_config()
config["storage"] = "S3"
config["url"] = create_aws_private_data
config["reader_impl"] = reader_impl
with open(self.aws_credentials) as json_file:
aws_config = json.load(json_file)
config["aws_access_key_id"] = aws_config["aws_access_key_id"]
config["aws_secret_access_key"] = aws_config["aws_secret_access_key"]
run_load_dataframes(config)
def run_load_dataframes(config):

View File

@@ -0,0 +1,6 @@
{
"filename": "integrationTestFile",
"format": "json",
"provider": { "storage": "HTTPS" },
"url": "https://think.cs.vt.edu/corgis/datasets/json/airlines/airlines.json"
}

View File

@@ -1,4 +1,6 @@
-e ../../bases/airbyte-protocol
-e ../../bases/base-python
-e ../../bases/base-python-test
-e .
-e .
pytest
boto3

View File

@@ -34,6 +34,7 @@ setup(
"airbyte-protocol",
"base-python",
"gcsfs",
"genson",
"google-cloud-storage",
"pandas>=0.24.1",
"paramiko",

View File

@@ -34,6 +34,7 @@ import gcsfs
import pandas as pd
from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteRecordMessage, AirbyteStream, Status, Type
from base_python import Source
from genson import SchemaBuilder
from google.cloud.storage import Client
from s3fs import S3FileSystem
from smart_open import open
@@ -93,11 +94,11 @@ class SourceFile(Source):
:return:
"""
config = config_container.rendered_config
storage = SourceFile.get_storage_scheme(logger, config["storage"], config["url"])
storage = SourceFile.get_storage_scheme(logger, config["provider"]["storage"], config["url"])
url = SourceFile.get_simple_url(config["url"])
logger.info(f"Checking access to {storage}{url}...")
try:
SourceFile.load_dataframes(config, logger, skip_data=True)
SourceFile.open_file_url(config, logger)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as err:
reason = f"Failed to load {storage}{url}: {repr(err)}\n{traceback.format_exc()}"
@@ -112,24 +113,32 @@ class SourceFile(Source):
:return:
"""
config = config_container.rendered_config
storage = SourceFile.get_storage_scheme(logger, config["storage"], config["url"])
storage = SourceFile.get_storage_scheme(logger, config["provider"]["storage"], config["url"])
url = SourceFile.get_simple_url(config["url"])
name = SourceFile.get_stream_name(config)
logger.info(f"Discovering schema of {name} at {storage}{url}...")
streams = []
try:
# TODO handle discovery of directories of multiple files instead
# Don't skip data when discovering in order to infer column types
df_list = SourceFile.load_dataframes(config, logger, skip_data=False)
fields = {}
for df in df_list:
for col in df.columns:
fields[col] = SourceFile.convert_dtype(df[col].dtype)
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {field: {"type": fields[field]} for field in fields},
}
if "format" in config and config["format"] == "json":
schema = SourceFile.load_nested_json_schema(config, logger)
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": schema,
}
else:
# Don't skip data when discovering in order to infer column types
df_list = SourceFile.load_dataframes(config, logger, skip_data=False)
fields = {}
for df in df_list:
for col in df.columns:
fields[col] = SourceFile.convert_dtype(df[col].dtype)
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {field: {"type": fields[field]} for field in fields},
}
streams.append(AirbyteStream(name=name, json_schema=json_schema))
except Exception as err:
reason = f"Failed to discover schemas of {name} at {storage}{url}: {repr(err)}\n{traceback.format_exc()}"
@@ -147,24 +156,32 @@ class SourceFile(Source):
:return:
"""
config = config_container.rendered_config
storage = SourceFile.get_storage_scheme(logger, config["storage"], config["url"])
storage = SourceFile.get_storage_scheme(logger, config["provider"]["storage"], config["url"])
url = SourceFile.get_simple_url(config["url"])
name = SourceFile.get_stream_name(config)
logger.info(f"Reading {name} ({storage}{url}, {catalog_path}, {state_path})...")
catalog = AirbyteCatalog.parse_obj(self.read_config(catalog_path))
selection = SourceFile.parse_catalog(catalog)
try:
df_list = SourceFile.load_dataframes(config, logger)
for df in df_list:
if len(selection) > 0:
columns = selection.intersection(set(df.columns))
else:
columns = df.columns
for data in df[columns].to_dict(orient="records"):
if "format" in config and config["format"] == "json":
data_list = SourceFile.load_nested_json(config, logger)
for data in data_list:
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=name, data=data, emitted_at=int(datetime.now().timestamp()) * 1000),
)
else:
df_list = SourceFile.load_dataframes(config, logger)
for df in df_list:
if len(selection) > 0:
columns = selection.intersection(set(df.columns))
else:
columns = df.columns
for data in df[columns].to_dict(orient="records"):
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=name, data=data, emitted_at=int(datetime.now().timestamp()) * 1000),
)
except Exception as err:
reason = f"Failed to read data of {name} at {storage}{url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
@@ -178,9 +195,113 @@ class SourceFile(Source):
reader_format = "csv"
if "format" in config:
reader_format = config["format"]
name = f"file_{config['storage']}.{reader_format}"
name = f"file_{config['provider']['storage']}.{reader_format}"
return name
@staticmethod
def open_file_url(config, logger):
storage = SourceFile.get_storage_scheme(logger, config["provider"]["storage"], config["url"])
url = SourceFile.get_simple_url(config["url"])
gcs_file = None
use_gcs_service_account = "service_account_json" in config["provider"] and storage == "gs://"
use_aws_account = "aws_access_key_id" in config["provider"] and "aws_secret_access_key" in config["provider"] and storage == "s3://"
# default reader impl
reader_impl = ""
if "reader_impl" in config["provider"]:
reader_impl = config["provider"]["reader_impl"]
if reader_impl == "gcsfs":
if use_gcs_service_account:
try:
token_dict = json.loads(config["provider"]["service_account_json"])
fs = gcsfs.GCSFileSystem(token=token_dict)
gcs_file = fs.open(f"gs://{url}")
result = gcs_file
except json.decoder.JSONDecodeError as err:
logger.error(f"Failed to parse gcs service account json: {repr(err)}\n{traceback.format_exc()}")
raise err
else:
result = open(f"{storage}{url}")
elif reader_impl == "s3fs":
if use_aws_account:
aws_access_key_id = None
if "aws_access_key_id" in config["provider"]:
aws_access_key_id = config["provider"]["aws_access_key_id"]
aws_secret_access_key = None
if "aws_secret_access_key" in config["provider"]:
aws_secret_access_key = config["provider"]["aws_secret_access_key"]
s3 = S3FileSystem(anon=False, key=aws_access_key_id, secret=aws_secret_access_key)
result = s3.open(f"s3://{url}", mode="r")
else:
result = open(f"{storage}{url}")
else: # using smart_open
if use_gcs_service_account:
try:
credentials = json.dumps(json.loads(config["provider"]["service_account_json"]))
tmp_service_account = tempfile.NamedTemporaryFile(delete=False)
with open(tmp_service_account, "w") as f:
f.write(credentials)
tmp_service_account.close()
client = Client.from_service_account_json(tmp_service_account.name)
result = open(f"gs://{url}", transport_params=dict(client=client))
os.remove(tmp_service_account.name)
except json.decoder.JSONDecodeError as err:
logger.error(f"Failed to parse gcs service account json: {repr(err)}\n{traceback.format_exc()}")
raise err
elif use_aws_account:
aws_access_key_id = ""
if "aws_access_key_id" in config["provider"]:
aws_access_key_id = config["provider"]["aws_access_key_id"]
aws_secret_access_key = ""
if "aws_secret_access_key" in config["provider"]:
aws_secret_access_key = config["provider"]["aws_secret_access_key"]
result = open(f"s3://{aws_access_key_id}:{aws_secret_access_key}@{url}")
elif storage == "webhdfs://":
host = config["provider"]["host"]
port = config["provider"]["port"]
result = open(f"webhdfs://{host}:{port}/{url}")
elif storage == "ssh://" or storage == "scp://" or storage == "sftp://":
user = config["provider"]["user"]
host = config["provider"]["host"]
if "password" in config["provider"]:
password = config["provider"]["password"]
# Explicitly turn off ssh keys stored in ~/.ssh
transport_params = {"connect_kwargs": {"look_for_keys": False}}
result = open(f"{storage}{user}:{password}@{host}/{url}", transport_params=transport_params)
else:
result = open(f"{storage}{user}@{host}/{url}")
else:
result = open(f"{storage}{url}")
return result, gcs_file
@staticmethod
def load_nested_json_schema(config, logger) -> dict:
url, gcs_file = SourceFile.open_file_url(config, logger)
try:
# Use Genson Library to take JSON objects and generate schemas that describe them,
builder = SchemaBuilder()
builder.add_object(json.load(url))
result = builder.to_schema()
if "items" in result and "properties" in result["items"]:
result = result["items"]["properties"]
finally:
if gcs_file:
gcs_file.close()
return result
@staticmethod
def load_nested_json(config, logger) -> list:
url, gcs_file = SourceFile.open_file_url(config, logger)
try:
result = json.load(url)
if isinstance(result, dict):
result = [result]
finally:
if gcs_file:
gcs_file.close()
return result
@staticmethod
def load_dataframes(config, logger, skip_data=False) -> List:
"""From an Airbyte Configuration file, load and return the appropriate pandas dataframe.
@@ -190,13 +311,6 @@ class SourceFile(Source):
:param logger:
:return: a list of dataframe loaded from files described in the configuration
"""
storage = SourceFile.get_storage_scheme(logger, config["storage"], config["url"])
url = SourceFile.get_simple_url(config["url"])
gcs_file = None
use_gcs_service_account = "service_account_json" in config and storage == "gs://"
use_aws_account = "aws_access_key_id" in config and "aws_secret_access_key" in config and storage == "s3://"
# default format reader
reader_format = "csv"
if "format" in config:
@@ -210,74 +324,7 @@ class SourceFile(Source):
if skip_data and reader_format == "csv":
reader_options["nrows"] = 0
reader_options["index_col"] = 0
# default reader impl
reader_impl = ""
if "reader_impl" in config:
reader_impl = config["reader_impl"]
if reader_impl == "gcsfs":
if use_gcs_service_account:
try:
token_dict = json.loads(config["service_account_json"])
fs = gcsfs.GCSFileSystem(token=token_dict)
gcs_file = fs.open(f"gs://{url}")
url = gcs_file
except json.decoder.JSONDecodeError as err:
logger.error(f"Failed to parse gcs service account json: {repr(err)}\n{traceback.format_exc()}")
raise err
else:
url = open(f"{storage}{url}")
elif reader_impl == "s3fs":
if use_aws_account:
aws_access_key_id = None
if "aws_access_key_id" in config:
aws_access_key_id = config["aws_access_key_id"]
aws_secret_access_key = None
if "aws_secret_access_key" in config:
aws_secret_access_key = config["aws_secret_access_key"]
s3 = S3FileSystem(anon=False, key=aws_access_key_id, secret=aws_secret_access_key)
url = s3.open(f"s3://{url}", mode="r")
else:
url = open(f"{storage}{url}")
else: # using smart_open
if use_gcs_service_account:
try:
credentials = json.dumps(json.loads(config["service_account_json"]))
tmp_service_account = tempfile.NamedTemporaryFile(delete=False)
with open(tmp_service_account, "w") as f:
f.write(credentials)
tmp_service_account.close()
client = Client.from_service_account_json(tmp_service_account.name)
url = open(f"gs://{url}", transport_params=dict(client=client))
os.remove(tmp_service_account.name)
except json.decoder.JSONDecodeError as err:
logger.error(f"Failed to parse gcs service account json: {repr(err)}\n{traceback.format_exc()}")
raise err
elif use_aws_account:
aws_access_key_id = ""
if "aws_access_key_id" in config:
aws_access_key_id = config["aws_access_key_id"]
aws_secret_access_key = ""
if "aws_secret_access_key" in config:
aws_secret_access_key = config["aws_secret_access_key"]
url = open(f"s3://{aws_access_key_id}:{aws_secret_access_key}@{url}")
elif storage == "webhdfs://":
host = config["host"]
port = config["port"]
url = open(f"webhdfs://{host}:{port}/{url}")
elif storage == "ssh://" or storage == "scp://" or storage == "sftp://":
user = config["user"]
host = config["host"]
if "password" in config:
password = config["password"]
# Explicitly turn off ssh keys stored in ~/.ssh
transport_params = {"connect_kwargs": {"look_for_keys": False}}
url = open(f"{storage}{user}:{password}@{host}/{url}", transport_params=transport_params)
else:
url = open(f"{storage}{user}@{host}/{url}")
else:
url = open(f"{storage}{url}")
url, gcs_file = SourceFile.open_file_url(config, logger)
try:
result = SourceFile.parse_file(logger, reader_format, url, reader_options)
finally:
@@ -292,7 +339,9 @@ class SourceFile(Source):
# pandas.read_csv additional arguments can be passed to customize how to parse csv.
# see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
result.append(pd.read_csv(url, **reader_options))
elif reader_format == "json":
elif reader_format == "flat_json":
# We can add option to call to pd.normalize_json to normalize semi-structured JSON data into a flat table
# by asking user to specify how to flatten the nested columns
result.append(pd.read_json(url, **reader_options))
elif reader_format == "html":
result += pd.read_html(url, **reader_options)

View File

@@ -5,9 +5,13 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "File Source Spec",
"type": "object",
"required": ["url", "storage"],
"additionalProperties": true,
"additionalProperties": false,
"required": ["filename", "format", "url", "provider"],
"properties": {
"filename": {
"type": "string",
"description": "Name of the final table where to replicate this file (should include only letters, numbers dash and underscores)"
},
"format": {
"type": "string",
"enum": [
@@ -21,47 +25,41 @@
"pickle"
],
"default": "csv",
"description": "File Format of the file to be replicated. Common formats are (csv, json or excel) but more advanced formats can be specified (html, parquet, orc, feather, pickle)",
"examples": ["csv"]
"description": "File Format of the file to be replicated."
},
"reader_options": {
"type": "string",
"description": "Parsers for File Formats are currently using the `read_*` methods from the Pandas Library. Each of these readers provides additional options that can be specified as part of this JSON string. As an example, it is possible to change the read_csv behavior to a TSV (tab separated instead of comma) when redefining the delimiter character. See documentation of each `read_*` primitive from here: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html",
"examples": ["{}", "{'sep': ' '}"]
},
"storage": {
"type": "string",
"enum": ["HTTPS", "GCS", "S3", "SSH", "SFTP", "WebHDFS", "local"],
"description": "Storage Provider or Location of the file(s) to be replicated. (Note that local storage of directory where csv files will be read must start with the local mount \"/local\" at the moment until we implement more advanced mounting options)",
"default": "HTTPS"
},
"url": {
"type": "string",
"description": "URL path to access the file to be replicated"
},
"filename": {
"type": "string",
"description": "Name of the file (should include only letters, numbers dash and underscores)"
}
},
"dependencies": {
"storage": {
"provider": {
"type": "object",
"description": "Storage Provider or Location of the file(s) to be replicated.",
"default": "Public Web",
"oneOf": [
{
"title": "Public Web",
"required": ["storage"],
"properties": {
"storage": {
"enum": ["HTTPS"]
"type": "string",
"enum": ["HTTPS"],
"default": "HTTPS"
}
}
},
{
"title": "Google Cloud Storage",
"required": ["storage", "reader_impl"],
"properties": {
"storage": {
"enum": ["GCS"]
"type": "string",
"enum": ["GCS"],
"default": "GCS"
},
"service_account_json": {
"type": "string",
@@ -75,11 +73,14 @@
}
}
},
{
"title": "Amazon Web Services",
"required": ["storage", "reader_impl"],
"properties": {
"storage": {
"enum": ["S3"]
"type": "string",
"enum": ["S3"],
"default": "S3"
},
"aws_access_key_id": {
"type": "string",
@@ -97,11 +98,14 @@
}
}
},
{
"title": "Secure Shell",
"required": ["storage", "user", "host"],
"properties": {
"storage": {
"enum": ["SSH"]
"type": "string",
"enum": ["SSH"],
"default": "SSH"
},
"user": {
"type": "string"
@@ -112,14 +116,16 @@
"host": {
"type": "string"
}
},
"required": ["user", "host"]
}
},
{
"title": "Secure File Transfer Protocol",
"required": ["storage", "user", "host"],
"properties": {
"storage": {
"enum": ["SFTP"]
"type": "string",
"enum": ["SFTP"],
"default": "SFTP"
},
"user": {
"type": "string"
@@ -130,14 +136,16 @@
"host": {
"type": "string"
}
},
"required": ["user", "host"]
}
},
{
"title": "WebHDFS",
"required": ["storage", "host", "port"],
"properties": {
"storage": {
"enum": ["WebHDFS"]
"type": "string",
"enum": ["WebHDFS"],
"default": "WebHDFS"
},
"host": {
"type": "string"
@@ -145,8 +153,19 @@
"port": {
"type": "number"
}
},
"required": ["host", "port"]
}
},
{
"title": "Local Filesystem",
"description": "(Note that local storage of directory where csv files will be read must start with the local mount \"/local\" at the moment until we implement more advanced mounting options)",
"required": ["storage"],
"properties": {
"storage": {
"type": "string",
"enum": ["local"],
"default": "local"
}
}
}
]
}

View File

@@ -38,3 +38,8 @@ echo "$FACEBOOK_MARKETING_API_TEST_INTEGRATION_CREDS" > "${FB_SECRETS_DIR}/confi
MKTO_SECRETS_DIR=airbyte-integrations/connectors/source-marketo-singer/secrets
mkdir $MKTO_SECRETS_DIR
echo "$SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG" > "${MKTO_SECRETS_DIR}/config.json"
SOURCEFILE_DIR=airbyte-integrations/connectors/source-file/secrets
mkdir $SOURCEFILE_DIR
echo "$BIGQUERY_INTEGRATION_TEST_CREDS" > "${SOURCEFILE_DIR}/gcs.json"
echo "$AWS_S3_INTEGRATION_TEST_CREDS" > "${SOURCEFILE_DIR}/aws.json"