1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-us-census/components.py
2024-12-18 14:05:43 -08:00

181 lines
6.9 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional, Union
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
SUCCESS_RESOLUTION,
ErrorResolution,
ResponseAction,
create_fallback_error_resolution,
)
from airbyte_cdk.sources.types import Config
class USCensusRecordExtractor(RecordExtractor):
"""
Parses the response from the us census website.
The US Census provides data in an atypical format,
which motivated the creation of this source rather
than using a generic http source.
* Data are represented in a two-dimensional array
* Square brackets [ ] hold arrays
* Values are separated by a , (comma).
e.g.
[["STNAME","POP","DATE_","state"],
["Alabama","4849377","7","01"],
["Alaska","736732","7","02"],
["Arizona","6731484","7","04"],
["Arkansas","2966369","7","05"],
["California","38802500","7","06"]]
"""
def extract_records(self, response: requests.Response) -> List[Record]: # type: ignore
# Where we accumulate a "row" of data until we encounter ']'
buffer = ""
# The response is in a tabular format where the first list of strings
# is the "header" of the table which we use as keys in the final dictionary
# we produce
header = []
# Characters with special meanings which should not be added to the buffer
# of values
ignore_chars = [
"[",
"\n",
]
# Context: save if previous value is an escape character
is_prev_escape = False
# Context: save if we are currently in double quotes
is_in_quotes = False
# Placeholder used to save position of commas that are
# within values, so the .split(',') call does not produce
# erroneous values
comma_placeholder = "||comma_placeholder||"
for response_chunk in response.iter_content(decode_unicode=True):
if response_chunk == "\\":
is_prev_escape = True
continue
elif response_chunk == '"' and not is_prev_escape:
# If we are in quotes and encounter
# closing quotes, we are not within quotes anymore
# otherwise we are within quotes.
is_in_quotes = not is_in_quotes
elif response_chunk == "," and is_in_quotes:
buffer += comma_placeholder
elif response_chunk in ignore_chars and not is_prev_escape:
pass
elif response_chunk == "]":
if not header:
header = buffer.split(",")
elif buffer:
# Remove the first character from the values since
# it's a comma.
split_values = buffer.split(",")[1:]
# Add back commas originally embedded in values
split_values = map(
lambda x: x.replace(comma_placeholder, ","),
split_values,
)
# Zip the values we found with the "header"
yield dict(
zip(
header,
split_values,
)
)
buffer = ""
else:
buffer += response_chunk
is_prev_escape = False
class USCensusErrorHandler(DefaultErrorHandler):
"""
This Custom Error Handler raises an error when an invalid API key is used.
In such cases, the status code is 200, so airbyte doesn't raise an error.
"""
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
if self.response_filters:
for response_filter in self.response_filters:
matched_error_resolution = response_filter.matches(response_or_exception=response_or_exception)
if matched_error_resolution:
return matched_error_resolution
if isinstance(response_or_exception, requests.Response):
if response_or_exception.ok:
if "Invalid Key" in response_or_exception.text:
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Failed to perform request. Error: Valid API Key needed.",
)
return SUCCESS_RESOLUTION
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)
return (
default_response_filter_resolution
if default_response_filter_resolution
else create_fallback_error_resolution(response_or_exception)
)
@dataclass
class USCensusSchema(SchemaLoader):
"""
The US Census website hosts many APIs: https://www.census.gov/data/developers/data-sets.html
These APIs return data in a non standard format.
We create the JSON schemas dynamically by reading the first "row" of data we get.
In this implementation all records are of type "string", but this function could
be changed to try and infer the data type based on the values it finds.
"""
config: Config
def get_json_schema(self) -> Mapping[str, Any]:
query_params = self.config.get("query_params")
if query_params:
parts = query_params.split("&")
parameters = []
for part in parts:
key, value = part.split("=", 1)
if key == "get":
parameters += value.split(",")
elif key == "for":
parameters.append(value.split(":")[0])
else:
parameters.append(key)
json_schema = {k: {"type": "string"} for k in parameters}
else:
json_schema = {"{ @context: https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld": {"type": "string"}}
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": json_schema,
}