1
0
mirror of synced 2025-12-21 19:11:14 -05:00
Files
airbyte/airbyte-integrations/connectors/source-s3/source_s3/exceptions.py

35 lines
1.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import List, Optional, Union
from airbyte_cdk import AirbyteTracedException, FailureType
from .source_files_abstract.file_info import FileInfo
class S3Exception(AirbyteTracedException):
def __init__(
self,
file_info: Union[List[FileInfo], FileInfo],
internal_message: Optional[str] = None,
message: Optional[str] = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None,
):
file_info = (
file_info
if isinstance(file_info, (list, tuple))
else [
file_info,
]
)
file_names = ", ".join([file.key for file in file_info])
user_friendly_message = f"""
The connector encountered an error while processing the file(s): {file_names}.
{message}
This can be an input configuration error as well, please double check your connection settings.
"""
super().__init__(internal_message=internal_message, message=user_friendly_message, failure_type=failure_type, exception=exception)