105 lines
3.9 KiB
Python
105 lines
3.9 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import re
|
|
from dataclasses import InitVar, dataclass
|
|
from datetime import date, datetime, time, timedelta, timezone
|
|
from typing import Any, Dict, List, Mapping, Union
|
|
|
|
import dpath.util
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
|
|
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.types import Config
|
|
|
|
|
|
class ParseDates:
|
|
@staticmethod
|
|
def parse_date(value):
|
|
# Xero datetimes can be .NET JSON date strings which look like
|
|
# "/Date(1419937200000+0000)/"
|
|
# https://developer.xero.com/documentation/api/requests-and-responses
|
|
pattern = r"Date\((\-?\d+)([-+])?(\d+)?\)"
|
|
match = re.search(pattern, value)
|
|
iso8601pattern = r"((\d{4})-([0-2]\d)-0?([0-3]\d)T([0-5]\d):([0-5]\d):([0-6]\d))"
|
|
if not match:
|
|
iso8601match = re.search(iso8601pattern, value)
|
|
if iso8601match:
|
|
try:
|
|
return datetime.strptime(value)
|
|
except Exception:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
millis_timestamp, offset_sign, offset = match.groups()
|
|
if offset:
|
|
if offset_sign == "+":
|
|
offset_sign = 1
|
|
else:
|
|
offset_sign = -1
|
|
offset_hours = offset_sign * int(offset[:2])
|
|
offset_minutes = offset_sign * int(offset[2:])
|
|
else:
|
|
offset_hours = 0
|
|
offset_minutes = 0
|
|
|
|
return datetime.fromtimestamp((int(millis_timestamp) / 1000), tz=timezone.utc) + timedelta(
|
|
hours=offset_hours, minutes=offset_minutes
|
|
)
|
|
|
|
@staticmethod
|
|
def convert_dates(obj):
|
|
if isinstance(obj, dict):
|
|
for key, value in obj.items():
|
|
if isinstance(value, str):
|
|
parsed_value = ParseDates.parse_date(value)
|
|
if parsed_value:
|
|
if type(parsed_value) == date:
|
|
parsed_value = datetime.combine(parsed_value, time.min)
|
|
parsed_value = parsed_value.replace(tzinfo=timezone.utc)
|
|
obj[key] = datetime.isoformat(parsed_value, timespec="seconds")
|
|
elif isinstance(value, (dict, list)):
|
|
ParseDates.convert_dates(value)
|
|
elif isinstance(obj, list):
|
|
for i in range(len(obj)):
|
|
if isinstance(obj[i], (dict, list)):
|
|
ParseDates.convert_dates(obj[i])
|
|
|
|
|
|
@dataclass
|
|
class CustomExtractor(RecordExtractor):
|
|
field_path: List[Union[InterpolatedString, str]]
|
|
config: Config
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
decoder: Decoder = JsonDecoder(parameters={})
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
for path_index in range(len(self.field_path)):
|
|
if isinstance(self.field_path[path_index], str):
|
|
self.field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
|
|
|
|
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
|
|
response_body = self.decoder.decode(response)
|
|
if len(self.field_path) == 0:
|
|
extracted = response_body
|
|
else:
|
|
path = [path.eval(self.config) for path in self.field_path]
|
|
if "*" in path:
|
|
extracted = dpath.util.values(response_body, path)
|
|
else:
|
|
extracted = dpath.util.get(response_body, path, default=[])
|
|
|
|
ParseDates.convert_dates(extracted)
|
|
|
|
if isinstance(extracted, list):
|
|
return extracted
|
|
elif extracted:
|
|
return [extracted]
|
|
else:
|
|
return []
|