1
0
mirror of synced 2026-01-24 16:01:55 -05:00

Naively merge all Python modules into one. (#3148)

* naively merge all python base modules into one

* Formatting changes.

Co-authored-by: Davin Chia <davinchia@gmail.com>
This commit is contained in:
Sherif A. Nada
2021-04-30 00:17:55 -07:00
committed by GitHub
parent fc00d36d0c
commit e9287e7964
39 changed files with 3251 additions and 14 deletions

View File

@@ -0,0 +1,59 @@
# --LICENSE--#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --LICENSE--#
from airbyte_cdk.base_python.catalog_helpers import CatalogHelper
from airbyte_cdk.base_python.cdk.abstract_source import AbstractSource
# Separate the SDK imports so they can be moved somewhere else more easily
from airbyte_cdk.base_python.cdk.streams.auth.core import HttpAuthenticator
from airbyte_cdk.base_python.cdk.streams.auth.oauth import Oauth2Authenticator
from airbyte_cdk.base_python.cdk.streams.auth.token import TokenAuthenticator
from airbyte_cdk.base_python.cdk.streams.core import Stream
from airbyte_cdk.base_python.cdk.streams.http import HttpStream
from airbyte_cdk.base_python.client import BaseClient
from airbyte_cdk.base_python.integration import AirbyteSpec, Destination, Integration, Source
from airbyte_cdk.base_python.logger import AirbyteLogger
from airbyte_cdk.base_python.source import BaseSource
# Must be the last one because the way we load the connector module creates a circular
# dependency and models might not have been loaded yet
from airbyte_cdk.base_python.entrypoint import AirbyteEntrypoint # noqa isort:skip
__all__ = [
"AirbyteLogger",
"AirbyteSpec",
"AbstractSource",
"BaseClient",
"BaseSource",
"CatalogHelper",
"Destination",
"HttpAuthenticator",
"HttpStream",
"Integration",
"Oauth2Authenticator",
"Source",
"Stream",
"TokenAuthenticator",
]

View File

@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from airbyte_cdk.models import AirbyteCatalog, SyncMode
class CatalogHelper:
@staticmethod
def coerce_catalog_as_full_refresh(catalog: AirbyteCatalog) -> AirbyteCatalog:
"""
Updates the sync mode on all streams in this catalog to be full refresh
"""
coerced_catalog = catalog.copy()
for stream in catalog.streams:
stream.source_defined_cursor = False
stream.supported_sync_modes = [SyncMode.full_refresh]
stream.default_cursor_field = None
# remove nulls
return AirbyteCatalog.parse_raw(coerced_catalog.json(exclude_unset=True, exclude_none=True))

View File

@@ -0,0 +1 @@
# Initialize SDK Package

View File

@@ -0,0 +1,184 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import copy
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
SyncMode,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.base_python.cdk.streams.core import Stream
from airbyte_cdk.base_python.integration import Source
from airbyte_cdk.base_python.logger import AirbyteLogger
class AbstractSource(Source, ABC):
"""
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods
in this class to create an Airbyte Specification compliant Source.
"""
@abstractmethod
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
:param config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data
source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""
@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""
@property
def name(self) -> str:
"""Source name"""
return self.__class__.__name__
def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
try:
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(error))
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
logger.info(f"Starting syncing {self.name}")
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
for configured_stream in catalog.streams:
try:
stream_instance = stream_instances[configured_stream.stream.name]
yield from self._read_stream(
logger=logger, stream_instance=stream_instance, configured_stream=configured_stream, connector_state=connector_state
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
logger.info(f"Finished syncing {self.name}")
def _read_stream(
self,
logger: AirbyteLogger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
) -> Iterator[AirbyteMessage]:
use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state)
else:
record_iterator = self._read_full_refresh(stream_instance, configured_stream)
record_counter = 0
stream_name = configured_stream.stream.name
logger.info(f"Syncing stream: {stream_name} ")
for record in record_iterator:
if record.type == MessageType.RECORD:
record_counter += 1
yield record
logger.info(f"Read {record_counter} records from {stream_name} stream")
def _read_incremental(
self,
logger: AirbyteLogger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
) -> Iterator[AirbyteMessage]:
stream_name = configured_stream.stream.name
stream_state = connector_state.get(stream_name, {})
if stream_state:
logger.info(f"Setting state of {stream_name} stream to {stream_state.get(stream_name)}")
checkpoint_interval = stream_instance.state_checkpoint_interval
slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
)
for slice in slices:
record_counter = 0
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=slice,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
for record_data in records:
record_counter += 1
yield self._as_airbyte_record(stream_name, record_data)
stream_state = stream_instance.get_updated_state(stream_state, record_data)
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
def _read_full_refresh(self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream) -> Iterator[AirbyteMessage]:
args = {"sync_mode": SyncMode.full_refresh, "cursor_field": configured_stream.cursor_field}
for slices in stream_instance.stream_slices(**args):
for record in stream_instance.read_records(stream_slice=slices, **args):
yield self._as_airbyte_record(configured_stream.stream.name, record)
def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
connector_state[stream_name] = stream_state
return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))
def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
now_millis = int(datetime.now().timestamp()) * 1000
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)

View File

@@ -0,0 +1 @@
# Initialize Streams Package

View File

@@ -0,0 +1 @@
# Initialize Auth Package

View File

@@ -0,0 +1,45 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from abc import ABC, abstractmethod
from typing import Any, Mapping
class HttpAuthenticator(ABC):
"""
Base abstract class for various HTTP Authentication strategies. Authentication strategies are generally
expected to provide security credentials via HTTP headers.
"""
@abstractmethod
def get_auth_header(self) -> Mapping[str, Any]:
"""
:return: A dictionary containing all the necessary headers to authenticate.
"""
class NoAuth(HttpAuthenticator):
def get_auth_header(self) -> Mapping[str, Any]:
return {}

View File

@@ -0,0 +1,34 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from typing import Any, Mapping
from airbyte_cdk.base_python.cdk.streams.auth.core import HttpAuthenticator
class JWTAuthenticator(HttpAuthenticator):
def get_auth_header(self) -> Mapping[str, Any]:
# TODO
raise NotImplementedError

View File

@@ -0,0 +1,88 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from typing import Any, Mapping, Tuple
import pendulum
import requests
from airbyte_cdk.base_python.cdk.streams.auth.core import HttpAuthenticator
class Oauth2Authenticator(HttpAuthenticator):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
The generated access token is attached to each request via the Authorization header.
"""
def __init__(self, token_refresh_endpoint: str, client_id: str, client_secret: str, refresh_token: str, scopes: [str] = None):
self.token_refresh_endpoint = token_refresh_endpoint
self.client_secret = client_secret
self.client_id = client_id
self.refresh_token = refresh_token
self.scopes = scopes
self._token_expiry_date = pendulum.now().subtract(days=1)
self._access_token = None
def get_auth_header(self) -> Mapping[str, Any]:
return {"Authorization": f"Bearer {self.get_access_token()}"}
def get_access_token(self):
if self.token_has_expired():
t0 = pendulum.now()
token, expires_in = self.refresh_access_token()
self._access_token = token
self._token_expiry_date = t0.add(seconds=expires_in)
return self._access_token
def token_has_expired(self) -> bool:
return pendulum.now() > self._token_expiry_date
def get_refresh_request_body(self) -> Mapping[str, any]:
""" Override to define additional parameters """
payload = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
}
if self.scopes:
payload["scopes"] = self.scopes
return payload
def refresh_access_token(self) -> Tuple[str, int]:
"""
returns a tuple of (access_token, token_lifespan_in_seconds)
"""
try:
response = requests.request(method="POST", url=self.token_refresh_endpoint, data=self.get_refresh_request_body())
response.raise_for_status()
response_json = response.json()
return response_json["access_token"], response_json["expires_in"]
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e

View File

@@ -0,0 +1,38 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from typing import Any, Mapping
from airbyte_cdk.base_python.cdk.streams.auth.core import HttpAuthenticator
class TokenAuthenticator(HttpAuthenticator):
def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
self.auth_method = auth_method
self.auth_header = auth_header
self._token = token
def get_auth_header(self) -> Mapping[str, Any]:
return {self.auth_header: f"{self.auth_method} {self._token}"}

View File

@@ -0,0 +1,151 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import inspect
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
import airbyte_cdk.base_python.cdk.utils.casing as casing
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.base_python.logger import AirbyteLogger
from airbyte_cdk.base_python.schema_helpers import ResourceSchemaLoader
def package_name_from_class(cls: object) -> str:
"""Find the package name given a class name"""
module = inspect.getmodule(cls)
return module.__name__.split(".")[0]
class Stream(ABC):
"""
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
"""
# Use self.logger in subclasses to log any messages
logger = AirbyteLogger() # TODO use native "logging" loggers with custom handlers
@property
def name(self) -> str:
"""
:return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
"""
return casing.camel_to_snake(self.__class__.__name__)
@abstractmethod
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
This method should be overridden by subclasses to read records based on the inputs
"""
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
# TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name)
def as_airbyte_stream(self) -> AirbyteStream:
stream = AirbyteStream(name=self.name, json_schema=dict(self.get_json_schema()), supported_sync_modes=[SyncMode.full_refresh])
if self.supports_incremental:
stream.source_defined_cursor = self.source_defined_cursor
stream.supported_sync_modes.append(SyncMode.incremental)
stream.default_cursor_field = self._wrapped_cursor_field()
return stream
@property
def supports_incremental(self) -> bool:
"""
:return: True if this stream supports incrementally reading data
"""
return len(self._wrapped_cursor_field()) > 0
def _wrapped_cursor_field(self) -> List[str]:
return [self.cursor_field] if isinstance(self.cursor_field, str) else self.cursor_field
@property
def cursor_field(self) -> Union[str, List[str]]:
"""
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
:return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
"""
return []
@property
def source_defined_cursor(self) -> bool:
"""
Return False if the cursor can be configured by the user.
"""
return True
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, any]]]:
"""
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
:param stream_state:
:return:
"""
return [None]
@property
def state_checkpoint_interval(self) -> Optional[int]:
"""
Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading
100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in
ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of
created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
"""
return None
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""
Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is
{'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
:param current_stream_state: The stream's current state object
:param latest_record: The latest record extracted from the stream
:return: An updated state object
"""
return {}

View File

@@ -0,0 +1,51 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from typing import Union
import requests
class BaseBackoffException(requests.exceptions.HTTPError):
pass
class UserDefinedBackoffException(BaseBackoffException):
"""
An exception that exposes how long it attempted to backoff
"""
def __init__(self, backoff: Union[int, float], request: requests.PreparedRequest, response: requests.Response):
"""
:param backoff: how long to backoff in seconds
:param request: the request that triggered this backoff exception
:param response: the response that triggered the backoff exception
"""
self.backoff = backoff
super().__init__(request=request, response=response)
class DefaultBackoffException(BaseBackoffException):
pass

View File

@@ -0,0 +1,234 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.base_python.cdk.streams.auth.core import HttpAuthenticator, NoAuth
from airbyte_cdk.base_python.cdk.streams.core import Stream
from airbyte_cdk.base_python.cdk.streams.exceptions import DefaultBackoffException, UserDefinedBackoffException
from airbyte_cdk.base_python.cdk.streams.rate_limiting import default_backoff_handler, user_defined_backoff_handler
class HttpStream(Stream, ABC):
"""
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
"""
source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
def __init__(self, authenticator: HttpAuthenticator = NoAuth()):
self._authenticator = authenticator
self._session = requests.Session()
@property
@abstractmethod
def url_base(self) -> str:
"""
:return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
"""
@property
def http_method(self) -> str:
"""
Override if needed. See get_request_data if using POST.
"""
return "GET"
@property
def authenticator(self) -> HttpAuthenticator:
return self._authenticator
@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Override this method to define a pagination strategy.
The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
:return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
"""
@abstractmethod
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
"""
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
"""
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""
Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
return {}
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
"""
Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
"""
return {}
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
"""
TODO make this possible to do for non-JSON APIs
Override when creating POST requests to populate the body of the request with a JSON payload.
"""
return None
@abstractmethod
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
"""
Parses the raw response object into a list of records.
By default, this returns an iterable containing the input. Override to parse differently.
:param response:
:return: An iterable containing the parsed response
"""
# TODO move all the retry logic to a functor/decorator which is input as an init parameter
def should_retry(self, response: requests.Response) -> bool:
"""
Override to set different conditions for backoff based on the response from the server.
By default, back off on the following HTTP response statuses:
- 429 (Too Many Requests) indicating rate limiting
- 500s to handle transient server errors
Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.
"""
return response.status_code == 429 or 500 <= response.status_code < 600
def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
to the default backoff behavior (e.g using an exponential algorithm).
"""
return None
def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params}
if self.http_method.upper() == "POST":
# TODO support non-json bodies
args["json"] = json
return requests.Request(**args).prepare()
# TODO allow configuring these parameters. If we can get this into the requests library, then we can do it without the ugly exception hacks
# see https://github.com/litl/backoff/pull/122
@default_backoff_handler(max_tries=5, factor=5)
@user_defined_backoff_handler(max_tries=5)
def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
"""
Wraps sending the request in rate limit and error handlers.
This method handles two types of exceptions:
1. Expected transient exceptions e.g: 429 status code.
2. Unexpected transient exceptions e.g: timeout.
To trigger a backoff, we raise an exception that is handled by the backoff decorator. If an exception is not handled by the decorator will
fail the sync.
For expected transient exceptions, backoff time is determined by the type of exception raised:
1. CustomBackoffException uses the user-provided backoff value
2. DefaultBackoffException falls back on the decorator's default behavior e.g: exponential backoff
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
response: requests.Response = self._session.send(request)
if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
if custom_backoff_time:
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
else:
raise DefaultBackoffException(request=request, response=response)
else:
# Raise any HTTP exceptions that happened in case there were unexpected ones
# TODO handle ignoring errors
response.raise_for_status()
return response
def read_records(
self,
sync_mode: SyncMode,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
cursor_field: List[str] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
args = {"stream_state": stream_state, "stream_slice": stream_slice}
pagination_complete = False
while not pagination_complete:
request = self._create_prepared_request(
path=self.path(**args),
headers=dict(self.request_headers(**args), **self.authenticator.get_auth_header()),
params=self.request_params(**args),
json=self.request_body_json(**args),
)
response = self._send_request(request)
yield from self.parse_response(response, **args)
next_page_token = self.next_page_token(response)
if next_page_token:
args["next_page_token"] = next_page_token
else:
pagination_complete = True
# Always return an empty generator just in case no records were ever yielded
yield from []

View File

@@ -0,0 +1,86 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import sys
import time
import backoff
from airbyte_cdk.base_python.cdk.streams.exceptions import DefaultBackoffException, UserDefinedBackoffException
from airbyte_cdk.base_python.logger import AirbyteLogger
from requests import codes, exceptions
TRANSIENT_EXCEPTIONS = (DefaultBackoffException, exceptions.ConnectTimeout, exceptions.ReadTimeout, exceptions.ConnectionError)
# TODO inject singleton logger?
logger = AirbyteLogger()
def default_backoff_handler(max_tries: int, factor: int, **kwargs):
def log_retry_attempt(details):
_, exc, _ = sys.exc_info()
logger.info(str(exc))
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying...")
def should_give_up(exc):
# If a non-rate-limiting related 4XX error makes it this far, it means it was unexpected and probably consistent, so we shouldn't back off
give_up = exc.response is not None and exc.response.status_code != codes.too_many_requests and 400 <= exc.response.status_code < 500
if give_up:
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}")
return give_up
return backoff.on_exception(
backoff.expo,
TRANSIENT_EXCEPTIONS,
jitter=None,
on_backoff=log_retry_attempt,
giveup=should_give_up,
max_tries=max_tries,
factor=factor,
**kwargs,
)
def user_defined_backoff_handler(max_tries: int, **kwargs):
def sleep_on_ratelimit(details):
_, exc, _ = sys.exc_info()
if isinstance(exc, UserDefinedBackoffException):
retry_after = exc.backoff
logger.info(f"Retrying. Sleeping for {retry_after} seconds")
time.sleep(retry_after + 1) # extra second to cover any fractions of second
def log_give_up(details):
_, exc, _ = sys.exc_info()
logger.error(f"Max retry limit reached. Request: {exc.request}, Response: {exc.response}")
return backoff.on_exception(
backoff.constant,
UserDefinedBackoffException,
interval=0, # skip waiting, we'll wait in on_backoff handler
on_backoff=sleep_on_ratelimit,
on_giveup=log_give_up,
jitter=None,
max_tries=max_tries,
**kwargs,
)

View File

@@ -0,0 +1 @@
# Initialize Utils Package

View File

@@ -0,0 +1,32 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
# https://stackoverflow.com/a/1176023
def camel_to_snake(s):
s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", s)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower()

View File

@@ -0,0 +1,120 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import inspect
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Generator, List, Mapping, Tuple
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode
from .schema_helpers import ResourceSchemaLoader
def package_name_from_class(cls: object) -> str:
"""Find the package name given a class name"""
module = inspect.getmodule(cls)
return module.__name__.split(".")[0]
class StreamStateMixin:
def get_stream_state(self, name: str) -> Any:
"""Get state of stream with corresponding name"""
raise NotImplementedError
def set_stream_state(self, name: str, state: Any):
"""Set state of stream with corresponding name"""
raise NotImplementedError
def stream_has_state(self, name: str) -> bool:
"""Tell if stream supports incremental sync"""
return False
class BaseClient(StreamStateMixin, ABC):
"""Base client for API"""
schema_loader_class = ResourceSchemaLoader
def __init__(self, **kwargs):
package_name = package_name_from_class(self.__class__)
self._schema_loader = self.schema_loader_class(package_name)
self._stream_methods = self._enumerate_methods()
def _enumerate_methods(self) -> Mapping[str, callable]:
"""Detect available streams and return mapping"""
prefix = "stream__"
mapping = {}
methods = inspect.getmembers(self.__class__, predicate=inspect.isfunction)
for name, method in methods:
if name.startswith(prefix):
mapping[name[len(prefix) :]] = getattr(self, name)
return mapping
@staticmethod
def _get_fields_from_stream(stream: AirbyteStream) -> List[str]:
return list(stream.json_schema.get("properties", {}).keys())
def _get_stream_method(self, name: str) -> Callable:
method = self._stream_methods.get(name)
if not method:
raise ValueError(f"Client does not know how to read stream `{name}`")
return method
def read_stream(self, stream: AirbyteStream) -> Generator[Dict[str, Any], None, None]:
"""Yield records from stream"""
method = self._get_stream_method(stream.name)
fields = self._get_fields_from_stream(stream)
for message in method(fields=fields):
yield dict(message)
@property
def streams(self) -> Generator[AirbyteStream, None, None]:
"""List of available streams"""
for name, method in self._stream_methods.items():
supported_sync_modes = [SyncMode.full_refresh]
source_defined_cursor = False
if self.stream_has_state(name):
supported_sync_modes += [SyncMode.incremental]
source_defined_cursor = True
yield AirbyteStream(
name=name,
json_schema=self._schema_loader.get_schema(name),
supported_sync_modes=supported_sync_modes,
source_defined_cursor=source_defined_cursor,
)
@abstractmethod
def health_check(self) -> Tuple[bool, str]:
"""Check if service is up and running"""
def configured_catalog_from_client(client: BaseClient) -> ConfiguredAirbyteCatalog:
"""Helper to generate configured catalog for testing"""
catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=stream) for stream in client.streams])
return catalog

View File

@@ -0,0 +1,136 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import importlib
import os.path
import sys
import tempfile
from airbyte_cdk.models import AirbyteMessage, Status, Type
from .integration import Source
from .logger import AirbyteLogger
logger = AirbyteLogger()
class AirbyteEntrypoint(object):
def __init__(self, source: Source):
self.source = source
def start(self, args):
# set up parent parsers
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# spec
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
# check
check_parser = subparsers.add_parser("check", help="checks the config can be used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# discover
discover_parser = subparsers.add_parser(
"discover", help="outputs a catalog describing the source's schema", parents=[parent_parser]
)
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# read
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser])
read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file")
required_read_parser = read_parser.add_argument_group("required named arguments")
required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
required_read_parser.add_argument(
"--catalog", type=str, required=True, help="path to the catalog used to determine which data to read"
)
# parse the args
parsed_args = main_parser.parse_args(args)
# execute
cmd = parsed_args.command
if not cmd:
raise Exception("No command passed")
# todo: add try catch for exceptions with different exit codes
with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=self.source.spec(logger))
print(message.json(exclude_unset=True))
sys.exit(0)
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)
if cmd == "check":
check_result = self.source.check(logger, config)
if check_result.status == Status.SUCCEEDED:
logger.info("Check succeeded")
else:
logger.error("Check failed")
output_message = AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result).json(exclude_unset=True)
print(output_message)
sys.exit(0)
elif cmd == "discover":
catalog = self.source.discover(logger, config)
print(AirbyteMessage(type=Type.CATALOG, catalog=catalog).json(exclude_unset=True))
sys.exit(0)
elif cmd == "read":
catalog = self.source.read_catalog(parsed_args.catalog)
state = self.source.read_state(parsed_args.state)
generator = self.source.read(logger, config, catalog, state)
for message in generator:
print(message.json(exclude_unset=True))
sys.exit(0)
else:
raise Exception("Unexpected command " + cmd)
def launch(source, args):
AirbyteEntrypoint(source).start(args)
def main():
impl_module = os.environ.get("AIRBYTE_IMPL_MODULE", Source.__module__)
impl_class = os.environ.get("AIRBYTE_IMPL_PATH", Source.__name__)
module = importlib.import_module(impl_module)
impl = getattr(module, impl_class)
# set up and run entrypoint
source = impl()
if not isinstance(source, Source):
raise Exception("Source implementation provided does not implement Source class!")
launch(source, sys.argv[1:])

View File

@@ -0,0 +1,121 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import os
import pkgutil
from collections import defaultdict
from typing import Dict, Generator
from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification
from .logger import AirbyteLogger
class AirbyteSpec(object):
@staticmethod
def from_file(file: str):
with open(file) as file:
spec_text = file.read()
return AirbyteSpec(spec_text)
def __init__(self, spec_string):
self.spec_string = spec_string
class Integration(object):
# can be overridden to change an input config
def configure(self, config: json, temp_dir: str) -> json:
"""
Persist config in temporary directory to run the Source job
"""
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config
@staticmethod
def read_config(config_path: str) -> json:
with open(config_path, "r") as file:
contents = file.read()
return json.loads(contents)
@staticmethod
def write_config(config: json, config_path: str):
with open(config_path, "w") as fh:
fh.write(json.dumps(config))
# can be overridden to change an input catalog
def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path))
# can be overridden to change an input state
def read_state(self, state_path: str) -> Dict[str, any]:
if state_path:
state_obj = json.loads(open(state_path, "r").read())
else:
state_obj = {}
state = defaultdict(dict, state_obj)
return state
def spec(self, logger: AirbyteLogger) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration.
"""
raw_spec = pkgutil.get_data(self.__class__.__module__.split(".")[0], "spec.json")
return ConnectorSpecification.parse_obj(json.loads(raw_spec))
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
"""
raise Exception("Not Implemented")
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
"""
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
"""
raise Exception("Not Implemented")
class Source(Integration):
def __init__(self):
super().__init__()
def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state_path: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:
"""
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
"""
raise Exception("Not Implemented")
class Destination(Integration):
def __init__(self):
super().__init__()

View File

@@ -0,0 +1,71 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import traceback
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage
class AirbyteLogger:
def __init__(self):
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]
def log_by_prefix(self, message, default_level):
split_line = message.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
log_level = first_word
rendered_message = " ".join(split_line[1:])
else:
log_level = default_level
rendered_message = message
self.log(log_level, rendered_message)
def log(self, level, message):
log_record = AirbyteLogMessage(level=level, message=message)
log_message = AirbyteMessage(type="LOG", log=log_record)
print(log_message.json(exclude_unset=True))
def fatal(self, message):
self.log("FATAL", message)
def exception(self, message):
message = f"{message}\n{traceback.format_exc()}"
self.error(message)
def error(self, message):
self.log("ERROR", message)
def warn(self, message):
self.log("WARN", message)
def info(self, message):
self.log("INFO", message)
def debug(self, message):
self.log("DEBUG", message)
def trace(self, message):
self.log("TRACE", message)

View File

@@ -0,0 +1,126 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import os
import pkgutil
from typing import Dict
import pkg_resources
from jsonschema import RefResolver
class JsonSchemaResolver:
"""Helper class to expand $ref items in json schema"""
def __init__(self, shared_schemas_path: str):
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)
@staticmethod
def _load_shared_schema_refs(shared_schemas_path: str):
shared_file_names = [f.name for f in os.scandir(shared_schemas_path) if f.is_file()]
shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(shared_schemas_path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)
return shared_schema_refs
def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
if "$ref" in schema:
reference_path = schema.pop("$ref", None)
resolved = resolver.resolve(reference_path)[1]
schema.update(resolved)
return self._resolve_schema_references(schema, resolver)
if "properties" in schema:
for k, val in schema["properties"].items():
schema["properties"][k] = self._resolve_schema_references(val, resolver)
if "patternProperties" in schema:
for k, val in schema["patternProperties"].items():
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)
if "items" in schema:
schema["items"] = self._resolve_schema_references(schema["items"], resolver)
if "anyOf" in schema:
for i, element in enumerate(schema["anyOf"]):
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)
return schema
def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
"""Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
"""
refs = refs or {}
refs = {**self._shared_refs, **refs}
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))
class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""
def __init__(self, package_name: str):
self.package_name = package_name
def get_schema(self, name: str) -> dict:
"""
This method retrieves a JSON schema from the schemas/ folder.
The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
living inside the "schemas/shared/" folder. For example:
schemas/shared/<shared_definition>.json
schemas/<name>.json # contains a $ref to shared_definition
schemas/<name2>.json # contains a $ref to shared_definition
"""
schema_filename = f"schemas/{name}.json"
raw_file = pkgutil.get_data(self.package_name, schema_filename)
if not raw_file:
raise IOError(f"Cannot find file {schema_filename}")
try:
raw_schema = json.loads(raw_file)
except ValueError:
# TODO use proper logging
print(f"Invalid JSON file format for file {schema_filename}")
raise
shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema

View File

@@ -0,0 +1,116 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import copy
from datetime import datetime
from typing import Any, Iterator, Mapping, MutableMapping, Type
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
SyncMode,
)
from airbyte_cdk.models import Type as MessageType
from .client import BaseClient
from .integration import Source
from .logger import AirbyteLogger
class BaseSource(Source):
"""Base source that designed to work with clients derived from BaseClient"""
client_class: Type[BaseClient] = None
@property
def name(self) -> str:
"""Source name"""
return self.__class__.__name__
def _get_client(self, config: Mapping):
"""Construct client"""
client = self.client_class(**config)
return client
def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Discover streams"""
client = self._get_client(config)
return AirbyteCatalog(streams=[stream for stream in client.streams])
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Check connection"""
client = self._get_client(config)
alive, error = client.health_check()
if not alive:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(error))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
state = state or {}
client = self._get_client(config)
logger.info(f"Starting syncing {self.name}")
total_state = copy.deepcopy(state)
for configured_stream in catalog.streams:
try:
yield from self._read_stream(logger=logger, client=client, configured_stream=configured_stream, state=total_state)
except Exception:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise
logger.info(f"Finished syncing {self.name}")
def _read_stream(
self, logger: AirbyteLogger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
):
stream_name = configured_stream.stream.name
use_incremental = configured_stream.sync_mode == SyncMode.incremental and client.stream_has_state(stream_name)
if use_incremental and state.get(stream_name):
logger.info(f"Set state of {stream_name} stream to {state.get(stream_name)}")
client.set_stream_state(stream_name, state.get(stream_name))
logger.info(f"Syncing {stream_name} stream")
for record in client.read_stream(configured_stream.stream):
now = int(datetime.now().timestamp()) * 1000
message = AirbyteRecordMessage(stream=stream_name, data=record, emitted_at=now)
yield AirbyteMessage(type=MessageType.RECORD, record=message)
if use_incremental and client.get_stream_state(stream_name):
state[stream_name] = client.get_stream_state(stream_name)
# output state object only together with other stream states
yield AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=state))