1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/utils.py
Denys Davydov f8c3e765a6 🐍 Source Mixpanel: certification preparations (#30149)
Co-authored-by: Anatolii Yatsuk <tolikyatsuk@gmail.com>
2023-09-27 15:04:38 +03:00

64 lines
1.9 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import re
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record
# Precompile the regex pattern.
ISO_FORMAT_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2})[ t](\d{2}:\d{2}:\d{2})$")
def to_iso_format(s: str) -> str:
"""
Convert a date string to ISO format if it matches recognized patterns.
Args:
- s (str): Input string to be converted.
Returns:
- str: Converted string in ISO format or the original string if no recognized pattern is found.
"""
# Use the precompiled regex pattern to match the date format.
match = ISO_FORMAT_PATTERN.match(s)
if match:
return match.group(1) + "T" + match.group(2)
return s
def fix_date_time(record):
"""
Recursively process a data structure to fix date and time formats.
Args:
- record (dict or list): The input data structure, which can be a dictionary or a list.
Returns:
- None: The function modifies the input data structure in place.
"""
# Define the list of fields that might contain date and time values.
date_time_fields = {"last_seen", "created", "last_authenticated"}
if isinstance(record, dict):
for field, value in list(record.items()): # Convert to list to avoid runtime errors during iteration.
if field in date_time_fields and isinstance(value, str):
record[field] = to_iso_format(value)
elif isinstance(value, (dict, list)):
fix_date_time(value)
elif isinstance(record, list):
for entry in record:
fix_date_time(entry)