35 lines
1.1 KiB
Python
35 lines
1.1 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from typing import List, Optional, Union
|
|
|
|
from airbyte_cdk import AirbyteTracedException, FailureType
|
|
|
|
from .source_files_abstract.file_info import FileInfo
|
|
|
|
|
|
class S3Exception(AirbyteTracedException):
|
|
def __init__(
|
|
self,
|
|
file_info: Union[List[FileInfo], FileInfo],
|
|
internal_message: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
failure_type: FailureType = FailureType.system_error,
|
|
exception: BaseException = None,
|
|
):
|
|
file_info = (
|
|
file_info
|
|
if isinstance(file_info, (list, tuple))
|
|
else [
|
|
file_info,
|
|
]
|
|
)
|
|
file_names = ", ".join([file.key for file in file_info])
|
|
user_friendly_message = f"""
|
|
The connector encountered an error while processing the file(s): {file_names}.
|
|
{message}
|
|
This can be an input configuration error as well, please double check your connection settings.
|
|
"""
|
|
super().__init__(internal_message=internal_message, message=user_friendly_message, failure_type=failure_type, exception=exception)
|