1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉 New Source: GCS (#23186)

* initial commit

* fix test error

* Update get_gcs_blobs logic

* add docs

* Update source_definitions.yaml

* Update airbyte-integrations/connectors/source-gcs/source_gcs/source.py

Co-authored-by: sh4sh <6833405+sh4sh@users.noreply.github.com>

* Update airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Co-authored-by: Denys Davydov <davydov.den18@gmail.com>

* Update airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py

Co-authored-by: Denys Davydov <davydov.den18@gmail.com>

* Update airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py

Co-authored-by: Denys Davydov <davydov.den18@gmail.com>

* update docker file for pandas package

* reimplement read_csv file

* add logic to filter selected streams

* close file_obj after reading

* fix format and tests

* add another stream

* auto-bump connector version

---------

Co-authored-by: Sunny <6833405+sh4sh@users.noreply.github.com>
Co-authored-by: Denys Davydov <davydov.den18@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Tuan Nguyen
2023-03-14 14:00:18 -04:00
committed by GitHub
parent 8ee32b1132
commit 200b035ec5
23 changed files with 567 additions and 0 deletions

View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" height="800" width="1200" viewBox="-19.20015 -28.483 166.4013 170.898"><g transform="translate(0 -7.034)"><linearGradient y2="120.789" x2="64" y1="7.034" x1="64" gradientUnits="userSpaceOnUse" id="a"><stop offset="0" stop-color="#4387fd"/><stop offset="1" stop-color="#4683ea"/></linearGradient><path d="M27.79 115.217L1.54 69.749a11.499 11.499 0 010-11.499l26.25-45.467a11.5 11.5 0 019.96-5.75h52.5a11.5 11.5 0 019.959 5.75l26.25 45.467a11.499 11.499 0 010 11.5l-26.25 45.466a11.5 11.5 0 01-9.959 5.75h-52.5a11.499 11.499 0 01-9.96-5.75z" fill="url(#a)"/></g><g transform="translate(0 -7.034)"><defs><path d="M27.791 115.217L1.541 69.749a11.499 11.499 0 010-11.499l26.25-45.467a11.499 11.499 0 019.959-5.75h52.5a11.5 11.5 0 019.96 5.75l26.25 45.467a11.499 11.499 0 010 11.5l-26.25 45.466a11.499 11.499 0 01-9.96 5.75h-52.5a11.499 11.499 0 01-9.959-5.75z" id="b"/></defs><clipPath id="c"><use height="100%" width="100%" xlink:href="#b" overflow="visible"/></clipPath><path clip-path="url(#c)" opacity=".07" d="M49.313 53.875l-7.01 6.99 5.957 5.958-5.898 10.476 44.635 44.636 10.816.002L118.936 84 85.489 50.55z"/></g><path d="M84.7 43.236H43.264c-.667 0-1.212.546-1.212 1.214v8.566c0 .666.546 1.212 1.212 1.212H84.7c.667 0 1.213-.546 1.213-1.212v-8.568c0-.666-.545-1.213-1.212-1.213m-6.416 7.976a2.484 2.484 0 01-2.477-2.48 2.475 2.475 0 012.477-2.477c1.37 0 2.48 1.103 2.48 2.477a2.48 2.48 0 01-2.48 2.48m6.415 8.491l-41.436.002c-.667 0-1.212.546-1.212 1.214v8.565c0 .666.546 1.213 1.212 1.213H84.7c.667 0 1.213-.547 1.213-1.213v-8.567c0-.666-.545-1.214-1.212-1.214m-6.416 7.976a2.483 2.483 0 01-2.477-2.48 2.475 2.475 0 012.477-2.477 2.48 2.48 0 110 4.956" fill="#fff"/></svg>

After

Width:  |  Height:  |  Size: 1.7 KiB

View File

@@ -649,6 +649,13 @@
icon: freshservice.svg
sourceType: api
releaseStage: alpha
- name: GCS
sourceDefinitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
dockerRepository: airbyte/source-gcs
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
icon: gcs.svg
sourceType: file
- name: Genesys
sourceDefinitionId: 5ea4459a-8f1a-452a-830f-a65c38cc438d
dockerRepository: airbyte/source-genesys

View File

@@ -4735,6 +4735,38 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-gcs:0.1.0"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Gcs Spec"
type: "object"
required:
- "gcs_bucket"
- "gcs_path"
- "service_account"
properties:
gcs_bucket:
type: "string"
title: "GCS bucket"
description: "GCS bucket name"
gcs_path:
type: "string"
title: "GCS Path"
description: "GCS path to data"
service_account:
type: "string"
title: "Service Account Information."
description: "Enter your Google Cloud <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys\"\
>service account key</a> in JSON format"
airbyte_secret: true
examples:
- "{ \"type\": \"service_account\", \"project_id\": YOUR_PROJECT_ID, \"\
private_key_id\": YOUR_PRIVATE_KEY, ... }"
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-genesys:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/genesys"

View File

@@ -0,0 +1,6 @@
*
!Dockerfile
!main.py
!source_gcs
!setup.py
!secrets

View File

@@ -0,0 +1,17 @@
FROM python:3.9-slim
# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
WORKDIR /airbyte/integration_code
COPY source_gcs ./source_gcs
COPY setup.py ./
COPY main.py ./
RUN pip install .
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-gcs

View File

@@ -0,0 +1,129 @@
# Gcs Source
This is the repository for the Gcs source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/gcs).
## Local development
### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Minimum Python version required `= 3.9.0`
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-gcs:build
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/gcs)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_gcs/spec.yaml` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source gcs test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
### Locally running the connector docker image
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/source-gcs:dev
```
You can also build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-gcs:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-gcs:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-gcs:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-gcs:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-gcs:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
```
To run your integration tests with docker
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-gcs:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-gcs:integrationTest
```
## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,27 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-gcs:dev
acceptance_tests:
spec:
tests:
- spec_path: "source_gcs/spec.yaml"
connection:
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
tests:
- config_path: "secrets/config.json"
basic_read:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
incremental:
bypass_reason: "This connector does not implement incremental sync"
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env sh
# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
# Pull latest acctest image
docker pull airbyte/connector-acceptance-test:latest
# Run
docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v $(pwd):/test_input \
airbyte/connector-acceptance-test \
--acceptance-test-config /test_input

View File

@@ -0,0 +1,9 @@
plugins {
id 'airbyte-python'
id 'airbyte-docker'
id 'airbyte-connector-acceptance-test'
}
airbytePython {
moduleDirectory 'source_gcs_singer'
}

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import pytest
pytest_plugins = ("connector_acceptance_test.plugin",)
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield

View File

@@ -0,0 +1,22 @@
{
"streams": [
{
"stream": {
"name": "example_1",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "example_2",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,3 @@
{
"todo-wrong-field": "this should be an incomplete config file, used in standard tests"
}

View File

@@ -0,0 +1,5 @@
{
"gcs_path": "",
"gcs_bucket": "airbyte-gcs-source",
"service_account": "{\"type\": \"service_account\", \"project_id\": ..."
}

View File

@@ -0,0 +1,13 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import sys
from airbyte_cdk.entrypoint import launch
from source_gcs import SourceGCS
if __name__ == "__main__":
source = SourceGCS()
launch(source, sys.argv[1:])

View File

@@ -0,0 +1,3 @@
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
-e ../../bases/connector-acceptance-test
-e .

View File

@@ -0,0 +1,26 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "google-cloud-storage==2.5.0", "pandas==1.5.3"]
TEST_REQUIREMENTS = [
"pytest~=6.2",
"connector-acceptance-test",
]
setup(
name="source_gcs",
description="Source implementation for Gcs.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json", "*.yaml"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},
)

View File

@@ -0,0 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from .source import SourceGCS
__all__ = ["SourceGCS"]

View File

@@ -0,0 +1,59 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import io
import json
import pandas as pd
from google.cloud import storage
from google.cloud.storage.blob import Blob
from google.oauth2 import service_account
def get_gcs_client(config):
credentials = service_account.Credentials.from_service_account_info(json.loads(config.get("service_account")))
client = storage.Client(credentials=credentials)
return client
def get_gcs_blobs(config):
client = get_gcs_client(config)
bucket = client.get_bucket(config.get("gcs_bucket"))
blobs = bucket.list_blobs(prefix=config.get("gcs_path"))
# TODO: only support CSV intially. Change this check if implementing other file formats.
blobs = [blob for blob in blobs if "csv" in blob.name.lower()]
return blobs
def read_csv_file(blob: Blob, read_header_only=False):
file_obj = io.BytesIO()
blob.download_to_file(file_obj)
file_obj.seek(0)
if read_header_only:
df = pd.read_csv(file_obj, nrows=0)
else:
df = pd.read_csv(file_obj)
file_obj.close()
return df
def construct_file_schema(df):
# Fix all columns to string for maximum compability
# Create a JSON schema object from the column data types
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {col: {"type": "string"} for col in df.columns},
}
return schema
def get_stream_name(blob):
blob_name = blob.name
# Remove path from stream name
blob_name_without_path = blob_name.split("/")[-1]
# Remove file extension from stream name
stream_name = blob_name_without_path.replace(".csv", "")
return stream_name

View File

@@ -0,0 +1,72 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
from datetime import datetime
from typing import Dict, Generator
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
Type,
)
from airbyte_cdk.sources import Source
from .helpers import construct_file_schema, get_gcs_blobs, get_stream_name, read_csv_file
class SourceGCS(Source):
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
"""
Check to see if a client can be created and list the files in the bucket.
"""
try:
blobs = get_gcs_blobs(config)
if not blobs:
return AirbyteConnectionStatus(status=Status.FAILED, message="No compatible file found in bucket")
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}")
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
streams = []
blobs = get_gcs_blobs(config)
for blob in blobs:
# Read the first 0.1MB of the file to determine schema
df = read_csv_file(blob, read_header_only=True)
stream_name = get_stream_name(blob)
json_schema = construct_file_schema(df)
streams.append(AirbyteStream(name=stream_name, json_schema=json_schema, supported_sync_modes=["full_refresh"]))
return AirbyteCatalog(streams=streams)
def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:
logger.info("Start reading")
blobs = get_gcs_blobs(config)
# Read only selected stream(s)
selected_streams = [configged_stream.stream.name for configged_stream in catalog.streams]
selected_blobs = [blob for blob in blobs if get_stream_name(blob) in selected_streams]
for blob in selected_blobs:
logger.info(blob.name)
df = read_csv_file(blob)
stream_name = get_stream_name(blob)
for _, row in df.iterrows():
row_dict = row.to_dict()
row_dict = {k: str(v) for k, v in row_dict.items()}
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=stream_name, data=row_dict, emitted_at=int(datetime.now().timestamp()) * 1000),
)

View File

@@ -0,0 +1,26 @@
documentationUrl: https://docsurl.com
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Gcs Spec
type: object
required:
- gcs_bucket
- gcs_path
- service_account
properties:
gcs_bucket:
type: string
title: GCS bucket
description: GCS bucket name
gcs_path:
type: string
title: GCS Path
description: GCS path to data
service_account:
type: string
title: Service Account Information.
description: 'Enter your Google Cloud <a href="https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys">service account key</a> in JSON format'
airbyte_secret: true
examples:
- '{ "type": "service_account", "project_id": YOUR_PROJECT_ID, "private_key_id": YOUR_PRIVATE_KEY, ... }'

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import unittest
from io import BytesIO
import pandas as pd
from source_gcs.helpers import construct_file_schema
class TestGCSFunctions(unittest.TestCase):
def setUp(self):
# Initialize the mock config
self.config = {
'service_account': '{"test_key": "test_value"}',
'gcs_bucket': 'test_bucket',
'gcs_path': 'test_path'
}
def test_construct_file_schema(self):
# Test that the function correctly constructs a JSON schema for a DataFrame
df = pd.read_csv(BytesIO(b"id,name\n1,Alice\n2,Bob\n3,Charlie\n"))
schema = construct_file_schema(df)
expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"}
}
}
self.assertEqual(schema, expected_schema)

View File

@@ -0,0 +1,35 @@
# GCS
This page guides you through the process of setting up the GCS source connector. This connector supports loading multiple CSV files (non compressed) from a GCS directory. The conntector will check for all files ending in `.csv`, even nested files.
## Prerequisites
* JSON credentials for the service account that has access to GCS. For more details check [instructions](https://cloud.google.com/iam/docs/creating-managing-service-accounts)
* GCS bucket
* Path to file(s)
## Set up Source
### Create a Service Account
First, you need to select existing or create a new project in the Google Cloud Console:
1. Sign in to the Google Account.
2. Go to the [Service Accounts](https://console.developers.google.com/iam-admin/serviceaccounts) page.
3. Click `Create service account`.
4. Create a JSON key file for the service user. The contents of this file will be provided as the `service_account` in the UI.
### Grant permisison to GCS
Use the service account ID from above, grant read access to your target bucket. Click [here](https://cloud.google.com/storage/docs/access-control/using-iam-permissions) for more details.
### Set up the source in Airbyte UI
* Paste the service account JSON key to `service_account`
* Enter your GCS bucket name to `gcs_bucket`
* Enter path to your file(s) to `gcs_path`
## Changelog
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------------- |
| 0.1.0 | 2023-02-16 | [23186](https://github.com/airbytehq/airbyte/pull/23186) | New Source: GCS |