106 lines
4.7 KiB
Python
106 lines
4.7 KiB
Python
#
|
|
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, Generator
|
|
|
|
from airbyte_cdk.logger import AirbyteLogger
|
|
from airbyte_cdk.models import (
|
|
AirbyteCatalog,
|
|
AirbyteConnectionStatus,
|
|
AirbyteMessage,
|
|
AirbyteRecordMessage,
|
|
AirbyteStream,
|
|
ConfiguredAirbyteCatalog,
|
|
Status,
|
|
Type,
|
|
)
|
|
from airbyte_cdk.sources import Source
|
|
|
|
|
|
class SourceScaffoldSourcePython(Source):
|
|
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
|
|
"""
|
|
Tests if the input configuration can be used to successfully connect to the integration
|
|
e.g: if a provided Stripe API token can be used to connect to the Stripe API.
|
|
|
|
:param logger: Logging object to display debug/info/error to the logs
|
|
(logs will not be accessible via airbyte UI if they are not passed to this logger)
|
|
:param config: Json object containing the configuration of this source, content of this json is as specified in
|
|
the properties of the spec.yaml file
|
|
|
|
:return: AirbyteConnectionStatus indicating a Success or Failure
|
|
"""
|
|
try:
|
|
# Not Implemented
|
|
|
|
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
|
|
except Exception as e:
|
|
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}")
|
|
|
|
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
|
|
"""
|
|
Returns an AirbyteCatalog representing the available streams and fields in this integration.
|
|
For example, given valid credentials to a Postgres database,
|
|
returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
|
|
|
|
:param logger: Logging object to display debug/info/error to the logs
|
|
(logs will not be accessible via airbyte UI if they are not passed to this logger)
|
|
:param config: Json object containing the configuration of this source, content of this json is as specified in
|
|
the properties of the spec.yaml file
|
|
|
|
:return: AirbyteCatalog is an object describing a list of all available streams in this source.
|
|
A stream is an AirbyteStream object that includes:
|
|
- its stream name (or table name in the case of Postgres)
|
|
- json_schema providing the specifications of expected schema for this stream (a list of columns described
|
|
by their names and types)
|
|
"""
|
|
streams = []
|
|
|
|
stream_name = "TableName" # Example
|
|
json_schema = { # Example
|
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
|
"type": "object",
|
|
"properties": {"columnName": {"type": "string"}},
|
|
}
|
|
|
|
# Not Implemented
|
|
|
|
streams.append(AirbyteStream(name=stream_name, json_schema=json_schema))
|
|
return AirbyteCatalog(streams=streams)
|
|
|
|
def read(
|
|
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
|
|
) -> Generator[AirbyteMessage, None, None]:
|
|
"""
|
|
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration,
|
|
catalog, and state.
|
|
|
|
:param logger: Logging object to display debug/info/error to the logs
|
|
(logs will not be accessible via airbyte UI if they are not passed to this logger)
|
|
:param config: Json object containing the configuration of this source, content of this json is as specified in
|
|
the properties of the spec.yaml file
|
|
:param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog
|
|
returned by discover(), but
|
|
in addition, it's been configured in the UI! For each particular stream and field, there may have been provided
|
|
with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc
|
|
:param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume
|
|
replication in the future from that saved checkpoint.
|
|
This is the object that is provided with state from previous runs and avoid replicating the entire set of
|
|
data everytime.
|
|
|
|
:return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object.
|
|
"""
|
|
stream_name = "TableName" # Example
|
|
data = {"columnName": "Hello World"} # Example
|
|
|
|
# Not Implemented
|
|
|
|
yield AirbyteMessage(
|
|
type=Type.RECORD,
|
|
record=AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=int(datetime.now().timestamp()) * 1000),
|
|
)
|