1
0
mirror of synced 2025-12-21 19:11:14 -05:00
Files
airbyte/airbyte-integrations/connectors/source-s3/source_s3/stream.py
2024-12-18 14:05:43 -08:00

97 lines
4.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from datetime import timedelta
from typing import Any, Iterator, Mapping
import pendulum
from boto3 import session as boto3session
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import ClientError
from airbyte_cdk import AirbyteTracedException, FailureType
from source_s3.s3_utils import make_s3_client
from .s3file import S3File
from .source_files_abstract.file_info import FileInfo
from .source_files_abstract.stream import IncrementalFileStream
class IncrementalFileStreamS3(IncrementalFileStream):
@property
def storagefile_class(self) -> type:
return S3File
def filepath_iterator(self, stream_state: Mapping[str, Any] = None) -> Iterator[FileInfo]:
"""
:yield: url filepath to use in S3File()
"""
stream_state = self._get_converted_stream_state(stream_state)
prefix = self._provider.get("path_prefix")
if prefix is None:
prefix = ""
msg = f"Iterating S3 bucket '{self._provider['bucket']}'"
self.logger.info(msg + f" with prefix: '{prefix}' " if prefix != "" else msg)
provider = self._provider
client_config = None
if S3File.use_aws_account(provider):
session = boto3session.Session(
aws_access_key_id=provider["aws_access_key_id"], aws_secret_access_key=provider["aws_secret_access_key"]
)
else:
session = boto3session.Session()
client_config = Config(signature_version=UNSIGNED)
client = make_s3_client(provider, config=client_config, session=session)
ctoken = None
while True:
# list_objects_v2 doesn't like a None value for ContinuationToken
# so we don't set it if we don't have one.
if ctoken:
kwargs = dict(Bucket=provider["bucket"], Prefix=provider.get("path_prefix", ""), ContinuationToken=ctoken) # type: ignore[unreachable]
else:
kwargs = dict(Bucket=provider["bucket"], Prefix=provider.get("path_prefix", ""))
try:
response = client.list_objects_v2(**kwargs)
content = response["Contents"]
except ClientError as e:
message = e.response.get("Error", {}).get("Message", {})
raise AirbyteTracedException(message, message, failure_type=FailureType.config_error)
except KeyError:
pass
else:
for file in content:
if self.is_not_folder(file) and self._filter_by_last_modified_date(file, stream_state):
yield FileInfo(key=file["Key"], last_modified=file["LastModified"], size=file["Size"])
ctoken = response.get("NextContinuationToken", None)
if not ctoken:
break
@staticmethod
def is_not_folder(file) -> bool:
return not file["Key"].endswith("/")
def _filter_by_last_modified_date(self, file: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None):
cursor_date = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self.start_date
file_in_history_and_last_modified_is_earlier_than_cursor_value = (
stream_state is not None
and self.cursor_field in stream_state.keys()
and file.get("LastModified") <= self._get_datetime_from_stream_state(stream_state)
and self.file_in_history(file["Key"], stream_state.get("history", {}))
)
file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value = file.get("LastModified") + timedelta(
days=self.buffer_days
) < self._get_datetime_from_stream_state(stream_state) and not self.file_in_history(file["Key"], stream_state.get("history", {}))
return (
file.get("LastModified") > cursor_date
and not file_in_history_and_last_modified_is_earlier_than_cursor_value
and not file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value
)