Template generation for new Source using the Santa CDK - provide basic scaffolding for someone implementing a new source. General approach is to buff up comments in the original SDK, and add TODOs with secondary comments in the generated stub methods, as well as links to existing examples (e.g. Stripe or ExchangeRate api) users can look at. Checked in and added tests for the generated modules.
20 KiB
client
package_name_from_class
package_name_from_class(cls: object) -> str
Find the package name given a class name
StreamStateMixin Objects
class StreamStateMixin()
get_stream_state
| get_stream_state(name: str) -> Any
Get state of stream with corresponding name
set_stream_state
| set_stream_state(name: str, state: Any)
Set state of stream with corresponding name
stream_has_state
| stream_has_state(name: str) -> bool
Tell if stream supports incremental sync
BaseClient Objects
class BaseClient(StreamStateMixin, ABC)
Base client for API
read_stream
| read_stream(stream: AirbyteStream) -> Generator[Dict[str, Any], None, None]
Yield records from stream
streams
| @property
| streams() -> Generator[AirbyteStream, None, None]
List of available streams
health_check
| @abstractmethod
| health_check() -> Tuple[bool, str]
Check if service is up and running
configured_catalog_from_client
configured_catalog_from_client(client: BaseClient) -> ConfiguredAirbyteCatalog
Helper to generate configured catalog for testing
__init__
entrypoint
logger
integration
Integration Objects
class Integration(object)
configure
| configure(config: json, temp_dir: str) -> json
Persist config in temporary directory to run the Source job
spec
| spec(logger: AirbyteLogger) -> ConnectorSpecification
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) required to run this integration.
check
| check(logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
discover
| discover(logger: AirbyteLogger, config: json) -> AirbyteCatalog
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
Source Objects
class Source(Integration)
read
| read(logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state_path: Dict[str, any]) -> Generator[AirbyteMessage, None, None]
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
catalog_helpers
CatalogHelper Objects
class CatalogHelper()
coerce_catalog_as_full_refresh
| @staticmethod
| coerce_catalog_as_full_refresh(catalog: AirbyteCatalog) -> AirbyteCatalog
Updates the sync mode on all streams in this catalog to be full refresh
schema_helpers
JsonSchemaResolver Objects
class JsonSchemaResolver()
Helper class to expand $ref items in json schema
resolve
| resolve(schema: dict, refs: Dict[str, dict] = None) -> dict
Resolves and replaces json-schema $refs with the appropriate dict. Recursively walks the given schema dict, converting every instance of $ref in a 'properties' structure with a resolved dict. This modifies the input schema and also returns it.
Arguments:
schema: the schema dict refs: a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
ResourceSchemaLoader Objects
class ResourceSchemaLoader()
JSONSchema loader from package resources
get_schema
| get_schema(name: str) -> dict
This method retrieves a JSON schema from the schemas/ folder.
The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs living inside the "schemas/shared/" folder. For example:
schemas/shared/<shared_definition>.json schemas/.json # contains a $ref to shared_definition schemas/.json # contains a $ref to shared_definition
source
BaseSource Objects
class BaseSource(Source)
Base source that designed to work with clients derived from BaseClient
name
| @property
| name() -> str
Source name
discover
| discover(logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog
Discover streams
check
| check(logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus
Check connection
cdk
cdk.streams
cdk.streams.rate_limiting
cdk.streams.auth
cdk.streams.auth.token
cdk.streams.auth.core
HttpAuthenticator Objects
class HttpAuthenticator(ABC)
Base abstract class for various HTTP Authentication strategies. Authentication strategies are generally expected to provide security credentials via HTTP headers.
get_auth_header
| @abstractmethod
| get_auth_header() -> Mapping[str, Any]
Returns:
A dictionary containing all the necessary headers to authenticate.
cdk.streams.auth.oauth
Oauth2Authenticator Objects
class Oauth2Authenticator(HttpAuthenticator)
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. The generated access token is attached to each request via the Authorization header.
get_refresh_request_body
| get_refresh_request_body() -> Mapping[str, any]
Override to define additional parameters
refresh_access_token
| refresh_access_token() -> Tuple[str, int]
returns a tuple of (access_token, token_lifespan_in_seconds)
cdk.streams.auth.jwt
cdk.streams.core
package_name_from_class
package_name_from_class(cls: object) -> str
Find the package name given a class name
Stream Objects
class Stream(ABC)
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
name
| @property
| name() -> str
Returns:
Stream name. By default this is the implementing class name, but it can be overridden as needed.
read_records
| @abstractmethod
| read_records(sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: Mapping[str, any] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Mapping[str, Any]]
This method should be overridden by subclasses to read records based on the inputs
get_json_schema
| get_json_schema() -> Mapping[str, Any]
Returns:
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
supports_incremental
| @property
| supports_incremental() -> bool
Returns:
True if this stream supports incrementally reading data
cursor_field
| @property
| cursor_field() -> Union[str, List[str]]
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns:
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
source_defined_cursor
| @property
| source_defined_cursor() -> bool
Return False if the cursor can be configured by the user.
stream_slices
| stream_slices(sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, any]]]
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Arguments:
stream_state:
Returns:
state_checkpoint_interval
| @property
| state_checkpoint_interval() -> Optional[int]
Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
get_updated_state
| get_updated_state(current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any])
Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
Arguments:
current_stream_state: The stream's current state objectlatest_record: The latest record extracted from the stream
Returns:
An updated state object
cdk.streams.http
HttpStream Objects
class HttpStream(Stream, ABC)
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
url_base
| @property
| @abstractmethod
| url_base() -> str
Returns:
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
http_method
| @property
| http_method() -> str
Override if needed. See get_request_data if using POST.
next_page_token
| @abstractmethod
| next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]
Override this method to define a pagination strategy.
The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
Returns:
The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
path
| @abstractmethod
| path(stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
request_params
| request_params(stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]
Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
request_headers
| request_headers(stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> Mapping[str, Any]
Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
request_body_json
| request_body_json(stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> Optional[Mapping]
TODO make this possible to do for non-JSON APIs Override when creating POST requests to populate the body of the request with a JSON payload.
parse_response
| @abstractmethod
| parse_response(response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> Iterable[Mapping]
Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently.
Arguments:
response:
Returns:
An iterable containing the parsed response
should_retry
| should_retry(response: requests.Response) -> bool
Override to set different conditions for backoff based on the response from the server.
By default, back off on the following HTTP response statuses:
- 429 (Too Many Requests) indicating rate limiting
- 500s to handle transient server errors
Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.
backoff_time
| backoff_time(response: requests.Response) -> Optional[float]
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
cdk.streams.exceptions
UserDefinedBackoffException Objects
class UserDefinedBackoffException(BaseBackoffException)
An exception that exposes how long it attempted to backoff
__init__
| __init__(backoff: Union[int, float], request: requests.PreparedRequest, response: requests.Response)
Arguments:
backoff: how long to backoff in secondsrequest: the request that triggered this backoff exceptionresponse: the response that triggered the backoff exception
cdk.utils
cdk.utils.casing
cdk.abstract_source
AbstractSource Objects
class AbstractSource(Source, ABC)
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.
check_connection
| @abstractmethod
| check_connection(logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]
Arguments:
config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns:
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
streams
| @abstractmethod
| streams(config: Mapping[str, Any]) -> List[Stream]
Arguments:
config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns:
A list of the streams in this source connector.
name
| @property
| name() -> str
Source name
discover
| discover(logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.
check
| check(logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.
read
| read(logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None) -> Iterator[AirbyteMessage]
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.