181 lines
6.9 KiB
Python
181 lines
6.9 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, List, Mapping, Optional, Union
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.models import FailureType
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
|
|
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
|
|
from airbyte_cdk.sources.declarative.types import Record
|
|
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
|
|
SUCCESS_RESOLUTION,
|
|
ErrorResolution,
|
|
ResponseAction,
|
|
create_fallback_error_resolution,
|
|
)
|
|
from airbyte_cdk.sources.types import Config
|
|
|
|
|
|
class USCensusRecordExtractor(RecordExtractor):
|
|
"""
|
|
Parses the response from the us census website.
|
|
|
|
The US Census provides data in an atypical format,
|
|
which motivated the creation of this source rather
|
|
than using a generic http source.
|
|
|
|
* Data are represented in a two-dimensional array
|
|
* Square brackets [ ] hold arrays
|
|
* Values are separated by a , (comma).
|
|
|
|
e.g.
|
|
[["STNAME","POP","DATE_","state"],
|
|
|
|
["Alabama","4849377","7","01"],
|
|
|
|
["Alaska","736732","7","02"],
|
|
|
|
["Arizona","6731484","7","04"],
|
|
|
|
["Arkansas","2966369","7","05"],
|
|
|
|
["California","38802500","7","06"]]
|
|
"""
|
|
|
|
def extract_records(self, response: requests.Response) -> List[Record]: # type: ignore
|
|
# Where we accumulate a "row" of data until we encounter ']'
|
|
buffer = ""
|
|
# The response is in a tabular format where the first list of strings
|
|
# is the "header" of the table which we use as keys in the final dictionary
|
|
# we produce
|
|
header = []
|
|
# Characters with special meanings which should not be added to the buffer
|
|
# of values
|
|
ignore_chars = [
|
|
"[",
|
|
"\n",
|
|
]
|
|
# Context: save if previous value is an escape character
|
|
is_prev_escape = False
|
|
# Context: save if we are currently in double quotes
|
|
is_in_quotes = False
|
|
# Placeholder used to save position of commas that are
|
|
# within values, so the .split(',') call does not produce
|
|
# erroneous values
|
|
comma_placeholder = "||comma_placeholder||"
|
|
|
|
for response_chunk in response.iter_content(decode_unicode=True):
|
|
if response_chunk == "\\":
|
|
is_prev_escape = True
|
|
continue
|
|
elif response_chunk == '"' and not is_prev_escape:
|
|
# If we are in quotes and encounter
|
|
# closing quotes, we are not within quotes anymore
|
|
# otherwise we are within quotes.
|
|
is_in_quotes = not is_in_quotes
|
|
elif response_chunk == "," and is_in_quotes:
|
|
buffer += comma_placeholder
|
|
elif response_chunk in ignore_chars and not is_prev_escape:
|
|
pass
|
|
elif response_chunk == "]":
|
|
if not header:
|
|
header = buffer.split(",")
|
|
elif buffer:
|
|
# Remove the first character from the values since
|
|
# it's a comma.
|
|
split_values = buffer.split(",")[1:]
|
|
# Add back commas originally embedded in values
|
|
split_values = map(
|
|
lambda x: x.replace(comma_placeholder, ","),
|
|
split_values,
|
|
)
|
|
# Zip the values we found with the "header"
|
|
yield dict(
|
|
zip(
|
|
header,
|
|
split_values,
|
|
)
|
|
)
|
|
buffer = ""
|
|
else:
|
|
buffer += response_chunk
|
|
is_prev_escape = False
|
|
|
|
|
|
class USCensusErrorHandler(DefaultErrorHandler):
|
|
"""
|
|
This Custom Error Handler raises an error when an invalid API key is used.
|
|
|
|
In such cases, the status code is 200, so airbyte doesn't raise an error.
|
|
"""
|
|
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
|
|
if self.response_filters:
|
|
for response_filter in self.response_filters:
|
|
matched_error_resolution = response_filter.matches(response_or_exception=response_or_exception)
|
|
if matched_error_resolution:
|
|
return matched_error_resolution
|
|
if isinstance(response_or_exception, requests.Response):
|
|
if response_or_exception.ok:
|
|
if "Invalid Key" in response_or_exception.text:
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.FAIL,
|
|
failure_type=FailureType.config_error,
|
|
error_message="Failed to perform request. Error: Valid API Key needed.",
|
|
)
|
|
return SUCCESS_RESOLUTION
|
|
|
|
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
|
|
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)
|
|
|
|
return (
|
|
default_response_filter_resolution
|
|
if default_response_filter_resolution
|
|
else create_fallback_error_resolution(response_or_exception)
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class USCensusSchema(SchemaLoader):
|
|
"""
|
|
The US Census website hosts many APIs: https://www.census.gov/data/developers/data-sets.html
|
|
|
|
These APIs return data in a non standard format.
|
|
We create the JSON schemas dynamically by reading the first "row" of data we get.
|
|
|
|
In this implementation all records are of type "string", but this function could
|
|
be changed to try and infer the data type based on the values it finds.
|
|
"""
|
|
|
|
config: Config
|
|
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
query_params = self.config.get("query_params")
|
|
if query_params:
|
|
parts = query_params.split("&")
|
|
parameters = []
|
|
for part in parts:
|
|
key, value = part.split("=", 1)
|
|
if key == "get":
|
|
parameters += value.split(",")
|
|
elif key == "for":
|
|
parameters.append(value.split(":")[0])
|
|
else:
|
|
parameters.append(key)
|
|
json_schema = {k: {"type": "string"} for k in parameters}
|
|
else:
|
|
json_schema = {"{ @context: https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld": {"type": "string"}}
|
|
return {
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"additionalProperties": True,
|
|
"type": "object",
|
|
"properties": json_schema,
|
|
}
|