Source S3: maintain backwards compatibility between V3 & V4 state messages (#29028)
This commit is contained in:
@@ -28,6 +28,7 @@ class IncrementalFileStreamS3(IncrementalFileStream):
|
||||
"""
|
||||
:yield: url filepath to use in S3File()
|
||||
"""
|
||||
stream_state = self._get_converted_stream_state(stream_state)
|
||||
prefix = self._provider.get("path_prefix")
|
||||
if prefix is None:
|
||||
prefix = ""
|
||||
@@ -66,7 +67,7 @@ class IncrementalFileStreamS3(IncrementalFileStream):
|
||||
pass
|
||||
else:
|
||||
for file in content:
|
||||
if self.is_not_folder(file) and self.filter_by_last_modified_date(file, stream_state):
|
||||
if self.is_not_folder(file) and self._filter_by_last_modified_date(file, stream_state):
|
||||
yield FileInfo(key=file["Key"], last_modified=file["LastModified"], size=file["Size"])
|
||||
ctoken = response.get("NextContinuationToken", None)
|
||||
if not ctoken:
|
||||
@@ -76,7 +77,7 @@ class IncrementalFileStreamS3(IncrementalFileStream):
|
||||
def is_not_folder(file) -> bool:
|
||||
return not file["Key"].endswith("/")
|
||||
|
||||
def filter_by_last_modified_date(self, file: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None):
|
||||
def _filter_by_last_modified_date(self, file: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None):
|
||||
cursor_date = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self.start_date
|
||||
|
||||
file_in_history_and_last_modified_is_earlier_than_cursor_value = (
|
||||
|
||||
Reference in New Issue
Block a user