1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-rss/source_rss/components.py
2025-09-04 14:53:42 -04:00

69 lines
2.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
from calendar import timegm
from datetime import datetime
from typing import Any, List, Mapping
import feedparser
import pytz
import requests
from dateutil.parser import parse
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
class CustomExtractor(RecordExtractor):
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
item_keys = [
"title",
"link",
"description",
"author",
"category",
"comments",
"enclosure",
"guid",
]
def convert_item_to_mapping(item) -> Mapping:
mapping = {}
for item_key in item_keys:
try:
mapping[item_key] = item[item_key]
except (AttributeError, KeyError):
pass
try:
dt = datetime.utcfromtimestamp(timegm(item.published_parsed))
dt_tz = dt.replace(tzinfo=pytz.UTC)
mapping["published"] = dt_tz.isoformat()
except (AttributeError, KeyError):
pass
return mapping
def is_newer(item, initial_state_date) -> bool:
try:
current_record_date = parse(item["published"])
except Exception:
current_record_date = None
if initial_state_date is None:
return True
elif current_record_date is None:
return True
else:
return current_record_date > initial_state_date
feed = feedparser.parse(response.text)
try:
initial_state_date = parse(feed["published"])
except Exception:
initial_state_date = None
all_item_mappings = [convert_item_to_mapping(item) for item in feed.entries[::-1]]
new_items = [item for item in all_item_mappings if is_newer(item, initial_state_date)]
return new_items