chore(source-gcs) update to airbyte-cdk v7 (#66671)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
@@ -7,7 +7,7 @@ from typing import Literal, Union
|
||||
|
||||
from pydantic.v1 import AnyUrl, BaseModel, Field
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
|
||||
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec, DeliverRawFiles, DeliverRecords
|
||||
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
|
||||
|
||||
|
||||
@@ -71,6 +71,17 @@ class Config(AbstractFileBasedSpec, BaseModel):
|
||||
|
||||
bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=2)
|
||||
|
||||
delivery_method: Union[DeliverRecords, DeliverRawFiles] = Field(
|
||||
title="Delivery Method",
|
||||
discriminator="delivery_type",
|
||||
type="object",
|
||||
order=3,
|
||||
display_type="radio",
|
||||
group="advanced",
|
||||
default="use_records_transfer",
|
||||
airbyte_hidden=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def documentation_url(cls) -> AnyUrl:
|
||||
"""
|
||||
|
||||
@@ -7,16 +7,16 @@ import logging
|
||||
from datetime import datetime
|
||||
|
||||
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
|
||||
from source_gcs.helpers import GCSRemoteFile
|
||||
from source_gcs.helpers import GCSUploadableRemoteFile
|
||||
|
||||
|
||||
class Cursor(DefaultFileBasedCursor):
|
||||
@staticmethod
|
||||
def get_file_uri(file: GCSRemoteFile) -> str:
|
||||
def get_file_uri(file: GCSUploadableRemoteFile) -> str:
|
||||
file_uri = file.displayed_uri if file.displayed_uri else file.uri
|
||||
return file_uri.split("?")[0]
|
||||
|
||||
def add_file(self, file: GCSRemoteFile) -> None:
|
||||
def add_file(self, file: GCSUploadableRemoteFile) -> None:
|
||||
uri = self.get_file_uri(file)
|
||||
self._file_to_datetime_history[uri] = file.last_modified.strftime(self.DATE_TIME_FORMAT)
|
||||
if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
|
||||
@@ -29,7 +29,7 @@ class Cursor(DefaultFileBasedCursor):
|
||||
"The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
|
||||
)
|
||||
|
||||
def _should_sync_file(self, file: GCSRemoteFile, logger: logging.Logger) -> bool:
|
||||
def _should_sync_file(self, file: GCSUploadableRemoteFile, logger: logging.Logger) -> bool:
|
||||
uri = self.get_file_uri(file)
|
||||
if uri in self._file_to_datetime_history:
|
||||
# If the file's uri is in the history, we should sync the file if it has been modified since it was synced
|
||||
|
||||
@@ -4,11 +4,15 @@
|
||||
|
||||
|
||||
import json
|
||||
import urllib.parse
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
import pytz
|
||||
from google.cloud import storage
|
||||
from google.oauth2 import credentials, service_account
|
||||
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
from airbyte_cdk.sources.file_based.remote_file import UploadableRemoteFile
|
||||
|
||||
|
||||
def get_gcs_client(config):
|
||||
@@ -44,10 +48,34 @@ def get_stream_name(blob):
|
||||
return stream_name
|
||||
|
||||
|
||||
class GCSRemoteFile(RemoteFile):
|
||||
class GCSUploadableRemoteFile(UploadableRemoteFile):
|
||||
"""
|
||||
Extends RemoteFile instance with displayed_uri attribute.
|
||||
displayed_uri is being used by Cursor to identify files with temporal local path in their uri attribute.
|
||||
"""
|
||||
|
||||
blob: Any
|
||||
displayed_uri: str = None
|
||||
|
||||
def __init__(self, blob: Any, displayed_uri: str = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.blob = blob
|
||||
self.displayed_uri = displayed_uri
|
||||
self.id = self.blob.id
|
||||
self.created_at = self.blob.time_created.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
self.updated_at = self.blob.updated.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
|
||||
@property
|
||||
def size(self) -> int:
|
||||
return self.blob.size
|
||||
|
||||
def download_to_local_directory(self, local_file_path: str) -> None:
|
||||
self.blob.download_to_filename(local_file_path)
|
||||
|
||||
@property
|
||||
def source_file_relative_path(self) -> str:
|
||||
return urllib.parse.unquote(self.blob.path)
|
||||
|
||||
@property
|
||||
def file_uri_for_logging(self) -> str:
|
||||
return urllib.parse.unquote(self.blob.path)
|
||||
|
||||
@@ -7,7 +7,12 @@ from typing import Any, Mapping, Optional
|
||||
|
||||
from airbyte_cdk import emit_configuration_as_airbyte_control_message
|
||||
from airbyte_cdk.models import AdvancedAuth, AuthFlowType, ConnectorSpecification, OAuthConfigSpecification
|
||||
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
|
||||
preserve_directory_structure,
|
||||
use_file_transfer,
|
||||
)
|
||||
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
|
||||
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
|
||||
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
|
||||
@@ -67,7 +72,10 @@ class SourceGCS(FileBasedSource):
|
||||
)
|
||||
|
||||
def _make_default_stream(
|
||||
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
|
||||
self,
|
||||
stream_config: FileBasedStreamConfig,
|
||||
cursor: Optional[AbstractFileBasedCursor],
|
||||
parsed_config: AbstractFileBasedSpec,
|
||||
) -> AbstractFileBasedStream:
|
||||
return GCSStream(
|
||||
config=stream_config,
|
||||
@@ -79,4 +87,6 @@ class SourceGCS(FileBasedSource):
|
||||
validation_policy=self._validate_and_get_validation_policy(stream_config),
|
||||
errors_collector=self.errors_collector,
|
||||
cursor=cursor,
|
||||
use_file_transfer=use_file_transfer(parsed_config),
|
||||
preserve_directory_structure=preserve_directory_structure(parsed_config),
|
||||
)
|
||||
|
||||
@@ -3,11 +3,11 @@
|
||||
from typing import Any
|
||||
|
||||
from airbyte_cdk.sources.file_based.stream import DefaultFileBasedStream
|
||||
from source_gcs.helpers import GCSRemoteFile
|
||||
from source_gcs.helpers import GCSUploadableRemoteFile
|
||||
|
||||
|
||||
class GCSStream(DefaultFileBasedStream):
|
||||
def transform_record(self, record: dict[str, Any], file: GCSRemoteFile, last_updated: str) -> dict[str, Any]:
|
||||
def transform_record(self, record: dict[str, Any], file: GCSUploadableRemoteFile, last_updated: str) -> dict[str, Any]:
|
||||
record[self.ab_last_mod_col] = last_updated
|
||||
record[self.ab_file_name_col] = file.displayed_uri if file.displayed_uri else file.uri
|
||||
return record
|
||||
|
||||
@@ -5,6 +5,7 @@ import itertools
|
||||
import json
|
||||
import logging
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
from datetime import datetime, timedelta
|
||||
from io import IOBase, StringIO
|
||||
from typing import Iterable, List, Optional
|
||||
@@ -14,10 +15,11 @@ import smart_open
|
||||
from google.cloud import storage
|
||||
from google.oauth2 import credentials, service_account
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import DeliverRawFiles
|
||||
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
|
||||
from source_gcs.config import Config
|
||||
from source_gcs.helpers import GCSRemoteFile
|
||||
from source_gcs.helpers import GCSUploadableRemoteFile
|
||||
from source_gcs.zip_helper import ZipHelper
|
||||
|
||||
|
||||
@@ -77,7 +79,7 @@ class SourceGCSStreamReader(AbstractFileBasedStreamReader):
|
||||
def gcs_client(self) -> storage.Client:
|
||||
return self._initialize_gcs_client()
|
||||
|
||||
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[GCSRemoteFile]:
|
||||
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[GCSUploadableRemoteFile]:
|
||||
"""
|
||||
Retrieve all files matching the specified glob patterns in GCS.
|
||||
"""
|
||||
@@ -103,10 +105,11 @@ class SourceGCSStreamReader(AbstractFileBasedStreamReader):
|
||||
else:
|
||||
uri = blob.generate_signed_url(expiration=timedelta(days=7), version="v4")
|
||||
|
||||
file_extension = ".".join(blob.name.split(".")[1:])
|
||||
remote_file = GCSRemoteFile(uri=uri, last_modified=last_modified, mime_type=file_extension)
|
||||
remote_file = GCSUploadableRemoteFile(
|
||||
uri=uri, blob=blob, last_modified=last_modified, mime_type=".".join(blob.name.split(".")[1:])
|
||||
)
|
||||
|
||||
if file_extension == "zip":
|
||||
if remote_file.mime_type == "zip" and self.config.delivery_method.delivery_type != DeliverRawFiles.delivery_type:
|
||||
yield from ZipHelper(blob, remote_file, self.tmp_dir).get_gcs_remote_files()
|
||||
else:
|
||||
yield remote_file
|
||||
@@ -122,7 +125,7 @@ class SourceGCSStreamReader(AbstractFileBasedStreamReader):
|
||||
prefix=prefix,
|
||||
) from exc
|
||||
|
||||
def open_file(self, file: GCSRemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
|
||||
def open_file(self, file: GCSUploadableRemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
|
||||
"""
|
||||
Open and yield a remote file from GCS for reading.
|
||||
"""
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Iterable
|
||||
|
||||
from google.cloud.storage.blob import Blob
|
||||
|
||||
from source_gcs.helpers import GCSRemoteFile
|
||||
from source_gcs.helpers import GCSUploadableRemoteFile
|
||||
|
||||
|
||||
logger = logging.getLogger("airbyte")
|
||||
@@ -17,7 +17,7 @@ logger = logging.getLogger("airbyte")
|
||||
class ZipHelper:
|
||||
BUFFER_SIZE_DEFAULT = 1024 * 1024
|
||||
|
||||
def __init__(self, blob: Blob, zip_file: GCSRemoteFile, tmp_dir: tempfile.TemporaryDirectory):
|
||||
def __init__(self, blob: Blob, zip_file: GCSUploadableRemoteFile, tmp_dir: tempfile.TemporaryDirectory):
|
||||
self._blob = blob
|
||||
self._size = blob.size
|
||||
self._tmp_dir = tmp_dir
|
||||
@@ -42,16 +42,17 @@ class ZipHelper:
|
||||
with zipfile.ZipFile(bytes_io, "r") as zf:
|
||||
zf.extractall(self._tmp_dir.name)
|
||||
|
||||
def get_gcs_remote_files(self) -> Iterable[GCSRemoteFile]:
|
||||
def get_gcs_remote_files(self) -> Iterable[GCSUploadableRemoteFile]:
|
||||
self._extract_files_to_tmp_directory(self._chunk_download())
|
||||
|
||||
for unzipped_file in os.listdir(self._tmp_dir.name):
|
||||
logger.info(f"Picking up file {unzipped_file.split('/')[-1]} from zip archive {self._blob.public_url}.")
|
||||
file_extension = unzipped_file.split(".")[-1]
|
||||
|
||||
yield GCSRemoteFile(
|
||||
yield GCSUploadableRemoteFile(
|
||||
uri=os.path.join(self._tmp_dir.name, unzipped_file), # uri to temporal local file
|
||||
last_modified=self._zip_file.last_modified,
|
||||
mime_type=file_extension,
|
||||
displayed_uri=self._zip_file.uri, # uri to remote file .zip
|
||||
blob=self._blob,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user