1
0
mirror of synced 2025-12-22 03:21:25 -05:00
Files
airbyte/airbyte-integrations/connectors/source-xero/components.py
2025-02-05 07:03:31 -08:00

105 lines
3.9 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import re
from dataclasses import InitVar, dataclass
from datetime import date, datetime, time, timedelta, timezone
from typing import Any, Dict, List, Mapping, Union
import dpath.util
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
class ParseDates:
@staticmethod
def parse_date(value):
# Xero datetimes can be .NET JSON date strings which look like
# "/Date(1419937200000+0000)/"
# https://developer.xero.com/documentation/api/requests-and-responses
pattern = r"Date\((\-?\d+)([-+])?(\d+)?\)"
match = re.search(pattern, value)
iso8601pattern = r"((\d{4})-([0-2]\d)-0?([0-3]\d)T([0-5]\d):([0-5]\d):([0-6]\d))"
if not match:
iso8601match = re.search(iso8601pattern, value)
if iso8601match:
try:
return datetime.strptime(value)
except Exception:
return None
else:
return None
millis_timestamp, offset_sign, offset = match.groups()
if offset:
if offset_sign == "+":
offset_sign = 1
else:
offset_sign = -1
offset_hours = offset_sign * int(offset[:2])
offset_minutes = offset_sign * int(offset[2:])
else:
offset_hours = 0
offset_minutes = 0
return datetime.fromtimestamp((int(millis_timestamp) / 1000), tz=timezone.utc) + timedelta(
hours=offset_hours, minutes=offset_minutes
)
@staticmethod
def convert_dates(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if isinstance(value, str):
parsed_value = ParseDates.parse_date(value)
if parsed_value:
if type(parsed_value) == date:
parsed_value = datetime.combine(parsed_value, time.min)
parsed_value = parsed_value.replace(tzinfo=timezone.utc)
obj[key] = datetime.isoformat(parsed_value, timespec="seconds")
elif isinstance(value, (dict, list)):
ParseDates.convert_dates(value)
elif isinstance(obj, list):
for i in range(len(obj)):
if isinstance(obj[i], (dict, list)):
ParseDates.convert_dates(obj[i])
@dataclass
class CustomExtractor(RecordExtractor):
field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(parameters={})
def __post_init__(self, parameters: Mapping[str, Any]):
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self.field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
response_body = self.decoder.decode(response)
if len(self.field_path) == 0:
extracted = response_body
else:
path = [path.eval(self.config) for path in self.field_path]
if "*" in path:
extracted = dpath.util.values(response_body, path)
else:
extracted = dpath.util.get(response_body, path, default=[])
ParseDates.convert_dates(extracted)
if isinstance(extracted, list):
return extracted
elif extracted:
return [extracted]
else:
return []