Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import re
|
|
from dataclasses import InitVar, dataclass
|
|
from datetime import datetime as dt
|
|
from typing import Any, List, Mapping, Optional, Tuple
|
|
|
|
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
|
|
|
|
|
|
class ParserError(Exception):
|
|
"""Replacement for pendulum's ParserError"""
|
|
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class CustomFieldTransformation(RecordTransformation):
|
|
"""
|
|
Remove all "empty" (e.g. '0000-00-00', '0000-00-00 00:00:00') 'date' and 'date-time' fields from record
|
|
"""
|
|
|
|
config: Config
|
|
parameters: InitVar[Optional[Mapping[str, Any]]] = None
|
|
|
|
def __post_init__(self, parameters: Optional[Mapping[str, Any]] = None):
|
|
# Handle the case when parameters is None
|
|
parameters = parameters or {}
|
|
self.name = parameters.get("name")
|
|
|
|
# Skip schema loading if name is None
|
|
if self.name is None:
|
|
self._schema = {}
|
|
self._date_and_date_time_fields = []
|
|
else:
|
|
self._schema = self._get_schema_root_properties()
|
|
self._date_and_date_time_fields = self._get_fields_with_property_formats_from_schema(("date", "date-time"))
|
|
|
|
def _get_schema_root_properties(self):
|
|
# Only call this if self.name is not None
|
|
if not self.name:
|
|
return {}
|
|
|
|
schema_loader = JsonFileSchemaLoader(config=self.config, parameters={"name": self.name})
|
|
schema = schema_loader.get_json_schema()
|
|
return schema.get("properties", {})
|
|
|
|
def _get_fields_with_property_formats_from_schema(self, property_formats: Tuple[str, ...]) -> List[str]:
|
|
"""
|
|
Get all properties from schema within property_formats
|
|
"""
|
|
return [k for k, v in self._schema.items() if v.get("format") in property_formats]
|
|
|
|
def parse(self, text):
|
|
"""
|
|
Direct replacement for pendulum.parse functionality.
|
|
Handles various date formats including those with timezone information.
|
|
"""
|
|
# Reject dates with zeros like '0000-00-00' or '0000-00-00 00:00:00'
|
|
if re.match(r"^0+[-]0+[-]0+", text):
|
|
raise ParserError("Zero date not allowed")
|
|
|
|
# Comprehensive list of formats to try
|
|
formats = [
|
|
# Basic formats
|
|
"%Y-%m-%d",
|
|
"%Y/%m/%d",
|
|
"%d-%m-%Y",
|
|
"%d/%m/%Y",
|
|
# Date and time formats
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%Y-%m-%d %H:%M:%S.%f",
|
|
"%Y/%m/%d %H:%M:%S",
|
|
"%Y/%m/%d %H:%M:%S.%f",
|
|
# ISO formats
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%dT%H:%M:%S.%f",
|
|
# With timezone
|
|
"%Y-%m-%d %H:%M:%S%z",
|
|
"%Y-%m-%d %H:%M:%S.%f%z",
|
|
"%Y-%m-%dT%H:%M:%S%z",
|
|
"%Y-%m-%dT%H:%M:%S.%f%z",
|
|
# Using Z for UTC
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
]
|
|
|
|
# Try parsing with different formats
|
|
for fmt in formats:
|
|
try:
|
|
# Handle 'Z' timezone indicator for UTC
|
|
text_to_parse = text
|
|
if fmt.endswith("Z") and not text.endswith("Z"):
|
|
continue
|
|
if not fmt.endswith("Z") and text.endswith("Z"):
|
|
text_to_parse = text[:-1] # Remove Z
|
|
fmt = fmt + "Z" if "Z" not in fmt else fmt
|
|
|
|
date_obj = dt.strptime(text_to_parse, fmt)
|
|
# In pendulum, dates with zero components are rejected
|
|
if date_obj.year == 0 or date_obj.month == 0 or date_obj.day == 0:
|
|
raise ParserError("Date with zero components")
|
|
return date_obj
|
|
except ValueError:
|
|
continue
|
|
|
|
# Try ISO format as a last resort
|
|
try:
|
|
# Replace Z with +00:00 for ISO format parsing
|
|
iso_text = text.replace("Z", "+00:00")
|
|
|
|
# For Python < 3.11 compatibility, remove microseconds if they have more than 6 digits
|
|
microseconds_match = re.search(r"\.(\d{7,})(?=[+-Z]|$)", iso_text)
|
|
if microseconds_match:
|
|
fixed_micro = microseconds_match.group(1)[:6]
|
|
iso_text = iso_text.replace(microseconds_match.group(0), f".{fixed_micro}")
|
|
|
|
date_obj = dt.fromisoformat(iso_text)
|
|
if date_obj.year == 0 or date_obj.month == 0 or date_obj.day == 0:
|
|
raise ParserError("Date with zero components")
|
|
return date_obj
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
# If nothing worked, raise the error like pendulum would
|
|
raise ParserError(f"Unable to parse: {text}")
|
|
|
|
def transform(
|
|
self,
|
|
record: Record,
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> Record:
|
|
# If we don't have any fields to check, just return the record as is
|
|
if not self._date_and_date_time_fields:
|
|
return record
|
|
|
|
for item in record:
|
|
if item in self._date_and_date_time_fields and record.get(item):
|
|
try:
|
|
self.parse(record[item])
|
|
except ParserError:
|
|
record[item] = None
|
|
return record
|