1
0
mirror of synced 2025-12-23 03:47:05 -05:00
Files
airbyte/airbyte-cdk/python/docs/tutorials/http_api_source.md
Yaroslav Dudar 8ddce6f355 🎉 Python CDK: Allow to ignore http status errors and override retry parameters (#5363)
added auto_fail_on_errors, max_retries, retry_factor properties to python cdk
2021-08-25 10:31:24 +03:00

31 KiB

Building a Python Source for an HTTP API

Summary

This is a step-by-step guide for how to create an Airbyte source in Python to read data from an HTTP API. We'll be using the Exchangerates API as an example since it is both simple but demonstrates a lot of the capabilities of the CDK.

Requirements

  • Python >= 3.7
  • Docker
  • NodeJS (only used to generate the connector). We'll remove the NodeJS dependency soon.

All the commands below assume that python points to a version of python >=3.7.0. On some systems, python points to a Python2 installation and python3 points to Python3. If this is the case on your machine, substitute all python commands in this guide with python3.

Checklist

  • Step 1: Create the source using the template
  • Step 2: Build the newly generated source
  • Step 3: Define the inputs needed by your connector
  • Step 4: Implement connection checking
  • Step 5: Declare the schema of your streams
  • Step 6: Implement functionality for reading your streams
  • Step 7: Use the connector in Airbyte
  • Step 8: Write unit tests or integration tests

Each step of the Creating a Source checklist is explained in more detail below. We also mention how you can submit the connector to be included with the general Airbyte release at the end of the tutorial.

Explaining Each Step

Step 1: Create the source using template

Airbyte provides a code generator which bootstraps the scaffolding for our connector.

$ cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
$ ./generate.sh

Select the Python HTTP CDK Source template and then input the name of your connector. For this walk-through we will refer to our source as python-http-example. The finalized source code for this tutorial can be found here.

The source we will build in this tutorial will pull data from the Rates API, a free and open API which documents historical exchange rates for fiat currencies.

Step 2: Install dependencies the newly generated source

Now that you've generated the module, let's navigate to its directory and install dependencies:

cd ../../connectors/source-<name>
python -m venv .venv # Create a virtual environment in the .venv directory
source .venv/bin/activate # enable the venv
pip install -r requirements.txt

This step sets up the initial python environment. All subsequent python or pip commands assume you have activated your virtual environment.

Let's verify everything is working as intended. Run:

python main_dev.py spec

You should see some output:

{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Python Http Tutorial Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": {"type": "string", "description": "describe me"}}}}}

We just ran Airbyte Protocol's spec command! We'll talk more about this later, but this is a simple sanity check to make sure everything is wired up correctly.

Note that the main_dev.py file is a simple script that makes it easy to run your connector. Its invocation format is python main_dev.py <command> [args]. See the module's generated README.md for the commands it supports.

Notes on iteration cycle

Dependencies

Python dependencies for your source should be declared in airbyte-integrations/connectors/source-<source-name>/setup.py in the install_requires field. You will notice that a couple of Airbyte dependencies are already declared there. Do not remove these; they give your source access to the helper interfaces provided by the generator.

You may notice that there is a requirements.txt in your source's directory as well. Don't edit this. It is autogenerated and used to provide Airbyte dependencies. All your dependencies should be declared in setup.py.

Development Environment

The commands we ran above created a Python virtual environment for your source. If you want your IDE to auto complete and resolve dependencies properly, point it at the virtual env airbyte-integrations/connectors/source-<source-name>/.venv. Also anytime you change the dependencies in the setup.py make sure to re-run pip install -r requirements.txt.

Iterating on your implementation

There are two ways we recommend iterating on a source. Consider using whichever one matches your style.

Run the source using python

You'll notice in your source's directory that there is a python file called main_dev.py. This file exists as convenience for development. You run it to test that your source works:

# from airbyte-integrations/connectors/source-<name>
python main_dev.py spec
python main_dev.py check --config secrets/config.json
python main_dev.py discover --config secrets/config.json
python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json

The nice thing about this approach is that you can iterate completely within python. The downside is that you are not quite running your source as it will actually be run by Airbyte. Specifically, you're not running it from within the docker container that will house it.

Run the source using docker

If you want to run your source exactly as it will be run by Airbyte (i.e. within a docker container), you can use the following commands from the connector module directory (airbyte-integrations/connectors/source-python-http-example):

# First build the container
docker build . -t airbyte/source-<name>:dev

# Then use the following commands to run it
docker run --rm airbyte/source-python-http-example:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-python-http-example:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-python-http-example:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-python-http-example:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json

Note: Each time you make a change to your implementation you need to re-build the connector image via docker build . -t airbyte/source-<name>:dev. This ensures the new python code is added into the docker container.

The nice thing about this approach is that you are running your source exactly as it will be run by Airbyte. The tradeoff is iteration is slightly slower, as the connector is re-built between each change.

Step 3: Define the inputs required by your connector

Each connector declares the inputs it needs to read data from the underlying data source. This is the Airbyte Protocol's spec operation.

The simplest way to implement this is by creating a .json file in source_<name>/spec.json which describes your connector's inputs according to the ConnectorSpecification schema. This is a good place to start when developing your source. Using JsonSchema, define what the inputs are (e.g. username and password). Here's an example of what the spec.json looks like for the Freshdesk API source.

For more details on what the spec is, you can read about the Airbyte Protocol here.

The generated code that Airbyte provides, handles implementing the spec method for you. It assumes that there will be a file called spec.json in the same directory as source.py. If you have declared the necessary JsonSchema in spec.json you should be done with this step.

Given that we'll pulling currency data for our example source, we'll define the following spec.json:

{
  "documentationUrl": "https://docs.airbyte.io/integrations/sources/exchangeratesapi",
  "connectionSpecification": {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Python Http Tutorial Spec",
    "type": "object",
    "required": ["start_date", "currency_base"],
    "additionalProperties": false,
    "properties": {
      "start_date": {
        "type": "string",
        "description": "Start getting data from that date.",
        "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
        "examples": ["%Y-%m-%d"]
      },
      "base": {
        "type": "string",
        "examples": ["USD", "EUR"]
        "description": "ISO reference currency. See <a href=\"https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html\">here</a>."
      }
    }
  }
}

In addition to metadata, we define two inputs:

  • start_date: The beginning date to start tracking currency exchange rates from
  • base: The currency whose rates we're interested in tracking

Step 4: Implement connection checking

The second operation in the Airbyte Protocol that we'll implement is the check operation.

This operation verifies that the input configuration supplied by the user can be used to connect to the underlying data source. Note that this user-supplied configuration has the values described in the spec.json filled in. In other words if the spec.json said that the source requires a username and password the config object might be { "username": "airbyte", "password": "password123" }. You should then implement something that returns a json object reporting, given the credentials in the config, whether we were able to connect to the source.

In our case, this is a fairly trivial check since the API requires no credentials. Instead, let's verify that the user-input base currency is a legitimate currency. In source.py we'll find the following autogenerated source:

class SourcePythonHttpTutorial(AbstractSource):

    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API

        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming the connector's spec.json
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        return True, None

...

Following the docstring instructions, we'll change the implementation to verify that the input currency is a real currency:

    def check_connection(self, logger, config) -> Tuple[bool, any]:
        accepted_currencies = {"USD", "JPY", "BGN", "CZK", "DKK"}  # assume these are the only allowed currencies
        input_currency = config['base']
        if input_currency not in accepted_currencies:
            return False, f"Input currency {input_currency} is invalid. Please input one of the following currencies: {accepted_currencies}"
        else:
            return True, None

Let's test out this implementation by creating two objects: a valid and an invalid config and attempt to give them as input to the connector

echo '{"start_date": "2021-04-01", "base": "USD"}'  > sample_files/config.json
echo '{"start_date": "2021-04-01", "base": "BTC"}'  > sample_files/invalid_config.json
python main_dev.py check --config sample_files/config.json
python main_dev.py check --config sample_files/invalid_config.json

You should see output like the following:

> python main_dev.py check --config sample_files/config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

> python main_dev.py check --config sample_files/invalid_config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "Input currency BTC is invalid. Please input one of the following currencies: {'DKK', 'USD', 'CZK', 'BGN', 'JPY'}"}}

While developing, we recommend storing configs which contain secrets in secrets/config.json because the secrets directory is gitignored by default.

Step 5: Declare the schema of your streams

The discover method of the Airbyte Protocol returns an AirbyteCatalog: an object which declares all the streams output by a connector and their schemas. It also declares the sync modes supported by the stream (full refresh or incremental). See the catalog tutorial for more information.

This is a simple task with the Airbyte CDK. For each stream in our connector we'll need to:

  1. Create a python class in source.py which extends HttpStream
  2. Place a <stream_name>.json file in the source_<name>/schemas/ directory. The name of the file should be the snake_case name of the stream whose schema it describes, and its contents should be the JsonSchema describing the output from that stream.

Let's create a class in source.py which extends HttpStream. You'll notice there are classes with extensive comments describing what needs to be done to implement various connector features. Feel free to read these classes as needed. But for the purposes of this tutorial, let's assume that we are adding classes from scratch either by deleting those generated classes or editing them to match the implementation below.

We'll begin by creating a stream to represent the data that we're pulling from the Exchange Rates API:

class ExchangeRates(HttpStream):
    url_base = "https://api.ratesapi.io/"

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        # The API does not offer pagination, so we return None to indicate there are no more pages in the response
        return None

    def path(
        self, 
        stream_state: Mapping[str, Any] = None, 
        stream_slice: Mapping[str, Any] = None, 
        next_page_token: Mapping[str, Any] = None
    ) -> str:
        return ""  # TODO

    def parse_response(
        self,
        response: requests.Response,
        stream_state: Mapping[str, Any],
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        return None  # TODO 
        

Note that this implementation is entirely empty -- we haven't actually done anything. We'll come back to this in the next step. But for now we just want to declare the schema of this stream. We'll declare this as a stream that the connector outputs by returning it from the streams method:

class SourcePythonHttpTutorial(AbstractSource):

    def check_connection(self, logger, config) -> Tuple[bool, any]:
        ...

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        # NoAuth just means there is no authentication required for this API and is included for completeness.
        # Skip passing an authenticator if no authentication is required.
        # Other authenticators are available for API token-based auth and Oauth2. 
        auth = NoAuth()  
        return [ExchangeRates(authenticator=auth)]

Having created this stream in code, we'll put a file exchange_rates.json in the schemas/ folder. You can download the JSON file describing the output schema here for convenience and place it in schemas/.

With .json schema file in place, let's see if the connector can now find this schema and produce a valid catalog:

python main_dev.py discover --config sample_files/config.json

you should see some output like:

{"type": "CATALOG", "catalog": {"streams": [{"name": "exchange_rates", "json_schema": {"$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": {"base": {"type": "string"}, "rates": {"type": "object", "properties": {"GBP": {"type": "number"}, "HKD": {"type": "number"}, "IDR": {"type": "number"}, "PHP": {"type": "number"}, "LVL": {"type": "number"}, "INR": {"type": "number"}, "CHF": {"type": "number"}, "MXN": {"type": "number"}, "SGD": {"type": "number"}, "CZK": {"type": "number"}, "THB": {"type": "number"}, "BGN": {"type": "number"}, "EUR": {"type": "number"}, "MYR": {"type": "number"}, "NOK": {"type": "number"}, "CNY": {"type": "number"}, "HRK": {"type": "number"}, "PLN": {"type": "number"}, "LTL": {"type": "number"}, "TRY": {"type": "number"}, "ZAR": {"type": "number"}, "CAD": {"type": "number"}, "BRL": {"type": "number"}, "RON": {"type": "number"}, "DKK": {"type": "number"}, "NZD": {"type": "number"}, "EEK": {"type": "number"}, "JPY": {"type": "number"}, "RUB": {"type": "number"}, "KRW": {"type": "number"}, "USD": {"type": "number"}, "AUD": {"type": "number"}, "HUF": {"type": "number"}, "SEK": {"type": "number"}}}, "date": {"type": "string"}}}, "supported_sync_modes": ["full_refresh"]}]}}

It's that simple! Now the connector knows how to declare your connector's stream's schema. We declare only one stream since our source is simple, but the principle is exactly the same if you had many streams.

You can also dynamically define schemas, but that's beyond the scope of this tutorial. See the schema docs for more information.

Step 6: Read data from the API

Describing schemas is good and all, but at some point we have to start reading data! So let's get to work. But before, let's describe what we're about to do:

The HttpStream superclass, like described in the concepts documentation, is facilitating reading data from HTTP endpoints. It contains built-in functions or helpers for:

  • authentication
  • pagination
  • handling rate limiting or transient errors
  • and other useful functionality

In order for it to be able to do this, we have to provide it with a few inputs:

  • the URL base and path of the endpoint we'd like to hit
  • how to parse the response from the API
  • how to perform pagination

Optionally, we can provide additional inputs to customize requests:

  • request parameters and headers
  • how to recognize rate limit errors, and how long to wait (by default it retries 429 and 5XX errors using exponential backoff)
  • HTTP method and request body if applicable
  • configure exponential backoff policy

Backoff policy options:

  • retry_factor Specifies factor for exponential backoff policy (by default is 5)
  • max_retries Specifies maximum amount of retries for backoff policy (by default is 5)
  • raise_on_http_errors If set to False, allows opting-out of raising HTTP code exception (by default is True)

There are many other customizable options - you can find them in the base_python.cdk.streams.http.HttpStream class.

So in order to read data from the exchange rates API, we'll fill out the necessary information for the stream to do its work. First, we'll implement a basic read that just reads the last day's exchange rates, then we'll implement incremental sync using stream slicing.

Let's begin by pulling data for the last day's rates by using the /latest endpoint:

class ExchangeRates(HttpStream):
    url_base = "https://api.ratesapi.io/"
    
    def __init__(self, base: str, **kwargs):
        super().__init__()
        self.base = base

    
    def path(
        self, 
        stream_state: Mapping[str, Any] = None, 
        stream_slice: Mapping[str, Any] = None, 
        next_page_token: Mapping[str, Any] = None
    ) -> str:
        # The "/latest" path gives us the latest currency exchange rates
        return "latest"  

    def request_params(
            self,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> MutableMapping[str, Any]:
        # The api requires that we include the base currency as a query param so we do that in this method
        return {'base': self.base}

    def parse_response(
            self,
            response: requests.Response,
            stream_state: Mapping[str, Any],
            stream_slice: Mapping[str, Any] = None,
            next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        # The response is a simple JSON whose schema matches our stream's schema exactly, 
        # so we just return a list containing the response
        return [response.json()]

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        # The API does not offer pagination, 
        # so we return None to indicate there are no more pages in the response
        return None

This may look big, but that's just because there are lots of (unused, for now) parameters in these methods (those can be hidden with Python's **kwargs, but don't worry about it for now). Really we just added a few lines of "significant" code:

  1. Added a constructor __init__ which stores the base currency to query for.
  2. return {'base': self.base} to add the ?base=<base-value> query parameter to the request based on the base input by the user.
  3. return [response.json()] to parse the response from the API to match the schema of our schema .json file.
  4. return "latest" to indicate that we want to hit the /latest endpoint of the API to get the latest exchange rate data.

Let's also pass the base parameter input by the user to the stream class:

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        auth = NoAuth()
        return [ExchangeRates(authenticator=auth, base=config['base'])]

We're now ready to query the API!

To do this, we'll need a ConfiguredCatalog. We've prepared one here -- download this and place it in sample_files/configured_catalog.json. Then run:

 python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json

you should see some output lines, one of which is a record from the API:

{"type": "RECORD", "record": {"stream": "exchange_rates", "data": {"base": "USD", "rates": {"GBP": 0.7196938353, "HKD": 7.7597848573, "IDR": 14482.4824162185, "ILS": 3.2412081092, "DKK": 6.1532478279, "INR": 74.7852709971, "CHF": 0.915763343, "MXN": 19.8439387671, "CZK": 21.3545717832, "SGD": 1.3261894911, "THB": 31.4398014067, "HRK": 6.2599917253, "EUR": 0.8274720728, "MYR": 4.0979726934, "NOK": 8.3043442284, "CNY": 6.4856433595, "BGN": 1.61836988, "PHP": 48.3516756309, "PLN": 3.770872983, "ZAR": 14.2690111709, "CAD": 1.2436905254, "ISK": 124.9482829954, "BRL": 5.4526272238, "RON": 4.0738932561, "NZD": 1.3841125362, "TRY": 8.3101365329, "JPY": 108.0182043856, "RUB": 74.9555647497, "KRW": 1111.7583781547, "USD": 1.0, "AUD": 1.2840711626, "HUF": 300.6206040546, "SEK": 8.3829540753}, "date": "2021-04-26"}, "emitted_at": 1619498062000}}

There we have it - a stream which reads data in just a few lines of code!

We theoretically could stop here and call it a connector. But let's give adding incremental sync a shot.

Adding incremental sync

To add incremental sync, we'll do a few things:

  1. Pass the start_date param input by the user into the stream.
  2. Declare the stream's cursor_field.
  3. Implement the get_updated_state method.
  4. Implement the stream_slices method.
  5. Update the path method to specify the date to pull exchange rates for.
  6. Update the configured catalog to use incremental sync when we're testing the stream.

We'll describe what each of these methods do below. Before we begin, it may help to familiarize yourself with how incremental sync works in Airbyte by reading the docs on incremental.

To keep things concise, we'll only show functions as we edit them one by one.

Let's get the easy parts out of the way and pass the start_date:

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        auth = NoAuth()
        # Parse the date from a string into a datetime object
        start_date = datetime.strptime(config['start_date'], '%Y-%m-%d') 
        return [ExchangeRates(authenticator=auth, base=config['base'], start_date=start_date)]

Let's also add this parameter to the constructor and declare the cursor_field:

from datetime import datetime, timedelta


class ExchangeRates(HttpStream):
    url_base = "https://api.ratesapi.io/"
    cursor_field = "date"

    def __init__(self, base: str, start_date: datetime, **kwargs):
        super().__init__()
        self.base = base
        self.start_date = start_date

Declaring the cursor_field informs the framework that this stream now supports incremental sync. The next time you run python main_dev.py discover --config sample_files/config.json you'll find that the supported_sync_modes field now also contains incremental.

But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a dict whose single key is 'date' and value is the date of the last day we synced data from. For example, {'date': '2021-04-26'} indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th.

Let's do this by implementing the get_updated_state method inside the ExchangeRates class.

    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
        # This method is called once for each record returned from the API to compare the cursor field value in that record with the current state
        # we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None.
        if current_stream_state is not None and 'date' in current_stream_state:
            current_parsed_date = datetime.strptime(current_stream_state['date'], '%Y-%m-%d')
            latest_record_date = datetime.strptime(latest_record['date'], '%Y-%m-%d')
            return {'date': max(current_parsed_date, latest_record_date).strftime('%Y-%m-%d')}
        else:
            return {'date': self.start_date.strftime('%Y-%m-%d')}

This implementation compares the date from the latest record with the date in the current state and takes the maximum as the "new" state object.

We'll implement the stream_slices method to return a list of the dates for which we should pull data based on the stream state if it exists:

 def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, any]]:
        """
        Returns a list of each day between the start date and now.
        The return value is a list of dicts {'date': date_string}.
        """
        dates = []
        while start_date < datetime.now():
            dates.append({'date': start_date.strftime('%Y-%m-%d')})
            start_date += timedelta(days=1)
        return dates

    def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[
        Optional[Mapping[str, any]]]:
        start_date = datetime.strptime(stream_state['date'], '%Y-%m-%d') if stream_state and 'date' in stream_state else self.start_date
        return self._chunk_date_range(start_date)

Each slice will cause an HTTP request to be made to the API. We can then use the information present in the stream_slice parameter (a single element from the list we constructed in stream_slices above) to set other configurations for the outgoing request like path or request_params. For more info about stream slicing, see the slicing docs.

In order to pull data for a specific date, the Exchange Rates API requires that we pass the date as the path component of the URL. Let's override the path method to achieve this:

def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str:
    return stream_slice['date']

With these changes, your implementation should look like the file here.

The last thing we need to do is change the sync_mode field in the sample_files/configured_catalog.json to incremental:

"sync_mode": "incremental",

We should now have a working implementation of incremental sync!

Let's try it out:

python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json

You should see a bunch of RECORD messages and STATE messages. To verify that incremental sync is working, pass the input state back to the connector and run it again:

# Save the latest state to sample_files/state.json
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json

# Run a read operation with the latest state message
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json

You should see that only the record from the last date is being synced! This is acceptable behavior, since Airbyte requires at-least-once delivery of records, so repeating the last record twice is OK.

With that, we've implemented incremental sync for our connector!

Step 7: Use the connector in Airbyte

To use your connector in your own installation of Airbyte, build the docker image for your container by running docker build . -t airbyte/source-python-http-example:dev. Then, follow the instructions from the building a python source tutorial for using the connector in the Airbyte UI, replacing the name as appropriate.

Note: your built docker image must be accessible to the docker daemon running on the Airbyte node. If you're doing this tutorial locally, these instructions are sufficient. Otherwise you may need to push your Docker image to Dockerhub.

Step 8: Test your connector

Unit Tests

Add any relevant unit tests to the unit_tests directory. Unit tests should not depend on any secrets.

You can run the tests using python -m pytest -s unit_tests

Integration Tests

Place any integration tests in the integration_tests directory such that they can be discovered by pytest.

Standard Tests

Standard tests are a fixed set of tests Airbyte provides that every Airbyte source connector must pass. While they're only required if you intend to submit your connector to Airbyte, you might find them helpful in any case. See Testing your connectors

If you want to submit this connector to become a default connector within Airbyte, follow steps 8 onwards from the Python source checklist