1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉 New Source: Convex.dev (#18403)

* convex source

* fixed unit tests

* use the smaller test instance

* fix integration tests and pagination

* fix tableName casing in api call

* fix logo

* more shapes -- thinking about moving this to an internal api

* use new /json_schemas api

* use new APIs

* remove unused start_date

* doc skeleton

* fix deletes with incremental sync

* fix test of json_schema

* fix expected records

* merge

* version and docs

* fix test

* fix docs url

* mention that you need to ask convex to enable it for your account

* docs

* two stage pagination for initial sync. some unit tests not working

* fixed all the unit tests, but we need to push backends before acceptance tests work

* fix snapshot pagination

* fix integration tests

* some checkboxes from the PR description

* Update airbyte-integrations/connectors/source-convex/source_convex/source.py

Co-authored-by: Sujay Jayakar <sujayakar314+github@gmail.com>

* Update airbyte-integrations/connectors/source-convex/source_convex/source.py

Co-authored-by: Sujay Jayakar <sujayakar314+github@gmail.com>

* stop storing the delta_has_more in state

* fix mypy

* use UTC timestamps

* rename instance_name to deployment_name

* include data format request param

* remove dev docker image

* fix unit test

* documentation

* .

* code review

* documentation and change deployment_name to deployment_url

* fix pip dependency

* regenerate the spec file

Co-authored-by: Sujay Jayakar <sujayakar314+github@gmail.com>
This commit is contained in:
Lee Danilek
2022-11-08 15:53:40 -08:00
committed by GitHub
parent 8be61b7e9a
commit 29d74fc6fd
30 changed files with 1036 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
*
!Dockerfile
!main.py
!source_convex
!setup.py
!secrets

View File

@@ -0,0 +1,38 @@
FROM python:3.9.13-alpine3.15 as base
# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code
# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base
COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .
# build a clean environment
FROM base
WORKDIR /airbyte/integration_code
# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone
# bash is installed for more convenient debugging.
RUN apk --no-cache add bash
# copy payload code only
COPY main.py ./
COPY source_convex ./source_convex
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-convex

View File

@@ -0,0 +1,132 @@
# Convex Source
This is the repository for the Convex source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/convex).
## Local development
### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Minimum Python version required `= 3.9.0`
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
pip install '.[tests]'
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
#### Building via Gradle
You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow.
To build using Gradle, from the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-convex:build
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/convex)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_convex/spec.yaml` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source convex test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
### Locally running the connector docker image
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/source-convex:dev
```
You can also build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-convex:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-convex:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-convex:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-convex:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-convex:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
```
To run your integration tests with docker
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-convex:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-convex:integrationTest
```
## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,29 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-convex:dev
tests:
spec:
- spec_path: "source_convex/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: yes
extra_records: yes
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env sh
# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
# Run
docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v $(pwd):/test_input \
airbyte/source-acceptance-test \
--acceptance-test-config /test_input

View File

@@ -0,0 +1,18 @@
# Convex
## Overview
Convex is the reactive backend-as-a-service for web developers.
As part of the backend, Convex stores developer-defined documents in tables.
Convex's HTTP API allows a developer to retrieve documents from their Convex tables.
## Endpoints
Convex defines three endpoints used for extracting data:
1. `/json_schema` identifies the data format for each table.
2. `/list_snapshot` returns pages of a table's data at a snapshot timestamp, for initial sync.
3. `/document_deltas` returns pages of modifications to a table's data after a given timestamp.
For more details, see the documentation for Convex Sync endpoints at
[https://docs.convex.dev/http-api/#sync](https://docs.convex.dev/http-api/#sync).

View File

@@ -0,0 +1,9 @@
plugins {
id 'airbyte-python'
id 'airbyte-docker'
id 'airbyte-source-acceptance-test'
}
airbytePython {
moduleDirectory 'source_convex'
}

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,12 @@
{
"posts": {
"snapshot_cursor": "hi",
"snapshot_has_more": false,
"delta_cursor": 2652635567679741986
},
"users": {
"snapshot_cursor": "hi",
"snapshot_has_more": false,
"delta_cursor": 2660025892355943945
}
}

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import pytest
pytest_plugins = ("source_acceptance_test.plugin",)
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield

View File

@@ -0,0 +1,56 @@
{
"streams": [
{
"sync_mode": "incremental",
"destination_sync_mode": "append",
"stream": {
"name": "posts",
"json_schema": {
"type": "object",
"properties": {
"_creationTime": { "type": "number" },
"_id": {
"type": "object",
"properties": { "$id": { "type": "string" } }
},
"author": {
"type": "object",
"properties": { "$id": { "type": "string" } }
},
"body": { "type": "string" },
"time": { "type": "number" },
"_ts": { "type": "number" }
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ts"],
"source_defined_primary_key": [["_id"]]
}
},
{
"sync_mode": "incremental",
"destination_sync_mode": "append",
"stream": {
"name": "users",
"json_schema": {
"type": "object",
"properties": {
"_creationTime": { "type": "number" },
"_id": {
"type": "object",
"properties": { "$id": { "type": "string" } }
},
"name": { "type": "string" },
"tokenIdentifier": { "type": "string" },
"_ts": { "type": "number" }
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ts"],
"source_defined_primary_key": [["_id"]]
}
}
]
}

View File

@@ -0,0 +1,3 @@
{"stream": "users", "data": {"_id": {"$id": "users|wnmxwZrHuQa8TWcCXl8faAW"}, "_creationTime": 1652593901795.4175, "name": "Lee Danilek", "tokenIdentifier": "https://dev-wgspahhl.us.auth0.com/|google-oauth2|116819734026499097324", "_ts": 1660025892365542190, "_ab_cdc_deleted_at": null, "_ab_cdc_lsn": 1660025892365542190, "_ab_cdc_updated_at": "2022-08-09T06:18:12.365542", "_deleted": false}, "emitted_at": 1665605909352}
{"stream": "posts", "data": {"_id": {"$id": "posts|UZs05arHuQa8TWcCXl8faAW"}, "_creationTime": 1652595311880.4985, "author": {"$id": "users|wnmxwZrHuQa8TWcCXl8faAW"}, "body": "first!", "time": 1652595311875.6301, "_ts": 1660025892355943945, "_ab_cdc_deleted_at": null, "_ab_cdc_lsn": 1660025892355943945, "_ab_cdc_updated_at": "2022-08-09T06:18:12.355944", "_deleted": false}, "emitted_at": 1665605909353}
{"stream": "posts", "data": {"_id": {"$id": "posts|iQKB5arHuQa8TWcCXl8faAW"}, "_creationTime": 1652595855409.799, "author": {"$id": "users|wnmxwZrHuQa8TWcCXl8faAW"}, "body": "second!", "time": 1652595855404.5964, "_ts": 1660025892355943945, "_ab_cdc_deleted_at": null, "_ab_cdc_lsn": 1660025892355943945, "_ab_cdc_updated_at": "2022-08-09T06:18:12.355944", "_deleted": false}, "emitted_at": 1665605909354}

View File

@@ -0,0 +1,4 @@
{
"deployment_url": "https://murky-swan-635.convex.cloud",
"access_key": "bad"
}

View File

@@ -0,0 +1,4 @@
{
"deployment_url": "https://descriptive-vulture-260.convex.cloud",
"access_key": "Your access token"
}

View File

@@ -0,0 +1,12 @@
{
"posts": {
"snapshot_cursor": "hi",
"snapshot_has_more": false,
"delta_cursor": 1
},
"users": {
"snapshot_cursor": null,
"snapshot_has_more": true,
"delta_cursor": null
}
}

View File

@@ -0,0 +1,13 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import sys
from airbyte_cdk.entrypoint import launch
from source_convex import SourceConvex
if __name__ == "__main__":
source = SourceConvex()
launch(source, sys.argv[1:])

View File

@@ -0,0 +1,2 @@
-e ../../bases/source-acceptance-test
-e .

View File

@@ -0,0 +1,30 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.2",
]
TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-mock~=3.6.1",
"source-acceptance-test",
"responses~=0.13.3",
]
setup(
name="source_convex",
description="Source implementation for Convex.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},
)

View File

@@ -0,0 +1,8 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from .source import SourceConvex
__all__ = ["SourceConvex"]

View File

@@ -0,0 +1,194 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from datetime import datetime
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, TypedDict
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator
ConvexConfig = TypedDict(
"ConvexConfig",
{
"deployment_url": str,
"access_key": str,
},
)
ConvexState = TypedDict(
"ConvexState",
{
"snapshot_cursor": Optional[str],
"snapshot_has_more": bool,
"delta_cursor": Optional[int],
},
)
# Source
class SourceConvex(AbstractSource):
def _json_schemas(self, config: ConvexConfig) -> requests.Response:
deployment_url = config["deployment_url"]
access_key = config["access_key"]
url = f"{deployment_url}/api/json_schemas?deltaSchema=true&format=convex_json"
headers = {"Authorization": f"Convex {access_key}"}
return requests.get(url, headers=headers)
def check_connection(self, logger: Any, config: ConvexConfig) -> Tuple[bool, Any]:
"""
Connection check to validate that the user-provided config can be used to connect to the underlying API
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = self._json_schemas(config)
if resp.status_code == 200:
return True, None
else:
return False, resp.text
def streams(self, config: ConvexConfig) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
resp = self._json_schemas(config)
assert resp.status_code == 200
json_schemas = resp.json()
table_names = list(json_schemas.keys())
return [
ConvexStream(
config["deployment_url"],
config["access_key"],
table_name,
json_schemas[table_name],
)
for table_name in table_names
]
class ConvexStream(HttpStream, IncrementalMixin):
def __init__(self, deployment_url: str, access_key: str, table_name: str, json_schema: Mapping[str, Any]):
self.deployment_url = deployment_url
self.table_name = table_name
if json_schema:
json_schema["properties"]["_ab_cdc_lsn"] = {"type": "number"}
json_schema["properties"]["_ab_cdc_updated_at"] = {"type": "string"}
json_schema["properties"]["_ab_cdc_deleted_at"] = {"anyOf": [{"type": "string"}, {"type": "null"}]}
else:
json_schema = {}
self.json_schema = json_schema
self._snapshot_cursor_value: Optional[str] = None
self._snapshot_has_more = True
self._delta_cursor_value: Optional[int] = None
self._delta_has_more = True
super().__init__(TokenAuthenticator(access_key, "Convex"))
@property
def name(self) -> str:
return self.table_name
@property
def url_base(self) -> str:
return self.deployment_url
def get_json_schema(self) -> Mapping[str, Any]:
return self.json_schema
primary_key = "_id"
cursor_field = "_ts"
# Checkpoint stream reads after this many records. This prevents re-reading of data if the stream fails for any reason.
state_checkpoint_interval = 128
@property
def state(self) -> ConvexState:
return {
"snapshot_cursor": self._snapshot_cursor_value,
"snapshot_has_more": self._snapshot_has_more,
"delta_cursor": self._delta_cursor_value,
}
@state.setter
def state(self, value: ConvexState) -> None:
self._snapshot_cursor_value = value["snapshot_cursor"]
self._snapshot_has_more = value["snapshot_has_more"]
self._delta_cursor_value = value["delta_cursor"]
def next_page_token(self, response: requests.Response) -> Optional[ConvexState]:
# Inner level of pagination shares the same state as outer,
# and returns None to indicate that we're done.
return self.state if self._delta_has_more else None
def path(
self,
stream_state: Optional[ConvexState] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[ConvexState] = None,
) -> str:
# https://docs.convex.dev/http-api/#sync
if self._snapshot_has_more:
return "/api/list_snapshot"
else:
return "/api/document_deltas"
def parse_response(
self,
response: requests.Response,
stream_state: ConvexState,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[ConvexState] = None,
) -> Iterable[Any]:
resp_json = response.json()
if self._snapshot_has_more:
self._snapshot_cursor_value = resp_json["cursor"]
self._snapshot_has_more = resp_json["hasMore"]
self._delta_cursor_value = resp_json["snapshot"]
else:
self._delta_cursor_value = resp_json["cursor"]
self._delta_has_more = resp_json["hasMore"]
return list(resp_json["values"])
def request_params(
self,
stream_state: ConvexState,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[ConvexState] = None,
) -> MutableMapping[str, Any]:
params: Dict[str, Any] = {"tableName": self.table_name, "format": "convex_json"}
if self._snapshot_has_more:
if self._snapshot_cursor_value:
params["cursor"] = self._snapshot_cursor_value
if self._delta_cursor_value:
params["snapshot"] = self._delta_cursor_value
else:
if self._delta_cursor_value:
params["cursor"] = self._delta_cursor_value
return params
def get_updated_state(self, current_stream_state: ConvexState, latest_record: Mapping[str, Any]) -> ConvexState:
"""
This (deprecated) method is still used by AbstractSource to update state between calls to `read_records`.
"""
return self.state
def read_records(self, *args: Any, **kwargs: Any) -> Iterator[Any]:
for record in super().read_records(*args, **kwargs):
ts_ns = record["_ts"]
ts_seconds = ts_ns / 1e9 # convert from nanoseconds.
# equivalent of java's `new Timestamp(transactionMillis).toInstant().toString()`
ts_datetime = datetime.utcfromtimestamp(ts_seconds)
ts = ts_datetime.isoformat()
# DebeziumEventUtils.CDC_LSN
record["_ab_cdc_lsn"] = ts_ns
# DebeziumEventUtils.CDC_DELETED_AT
record["_ab_cdc_updated_at"] = ts
record["_deleted"] = "_deleted" in record and record["_deleted"]
# DebeziumEventUtils.CDC_DELETED_AT
record["_ab_cdc_deleted_at"] = ts if record["_deleted"] else None
yield record

View File

@@ -0,0 +1,20 @@
documentationUrl: https://docs.airbyte.com/integrations/sources/convex
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Convex Source Spec
type: object
required:
- deployment_url
- access_key
properties:
deployment_url:
type: string
title: Deployment Url
examples:
- https://murky-swan-635.convex.cloud
- https://cluttered-owl-337.convex.cloud
access_key:
type: string
title: Access Key
description: API access key used to retrieve data from Convex.
airbyte_secret: true

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,83 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock
from airbyte_cdk.models import SyncMode
from pytest import fixture
from source_convex.source import ConvexStream
@fixture
def patch_incremental_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(ConvexStream, "path", "v0/example_endpoint")
mocker.patch.object(ConvexStream, "primary_key", "test_primary_key")
mocker.patch.object(ConvexStream, "__abstractmethods__", set())
def test_cursor_field(patch_incremental_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
expected_cursor_field = "_ts"
assert stream.cursor_field == expected_cursor_field
def test_get_updated_state(patch_incremental_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
resp = MagicMock()
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 3000, "hasMore": True}
stream.parse_response(resp, {})
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1234,
"snapshot_has_more": True,
"delta_cursor": 3000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 1235, "snapshot": 3000, "hasMore": False}
stream.parse_response(resp, {})
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
"delta_cursor": 3000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 8000, "hasMore": True}
stream.parse_response(resp, {})
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
"delta_cursor": 8000,
}
assert stream._delta_has_more is True
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 9000, "hasMore": False}
stream.parse_response(resp, {})
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
"delta_cursor": 9000,
}
assert stream._delta_has_more is False
def test_stream_slices(patch_incremental_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}}
expected_stream_slice = [None]
assert stream.stream_slices(**inputs) == expected_stream_slice
def test_supports_incremental(patch_incremental_base_class, mocker):
mocker.patch.object(ConvexStream, "cursor_field", "dummy_field")
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
assert stream.supports_incremental
def test_source_defined_cursor(patch_incremental_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
assert stream.source_defined_cursor
def test_stream_checkpoint_interval(patch_incremental_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
expected_checkpoint_interval = 128
assert stream.state_checkpoint_interval == expected_checkpoint_interval

View File

@@ -0,0 +1,95 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock
import responses
from source_convex.source import SourceConvex
def setup_responses():
sample_shapes_resp = {
"posts": {
"type": "object",
"properties": {
"_creationTime": {"type": "number"},
"_id": {"$description": "Id(posts)", "type": "object", "properties": {"$id": {"type": "string"}}},
"author": {"$description": "Id(users)", "type": "object", "properties": {"$id": {"type": "string"}}},
"body": {"type": "string"},
"_ts": {"type": "integer"},
"_deleted": {"type": "boolean"},
},
"$schema": "http://json-schema.org/draft-07/schema#",
},
"users": {
"type": "object",
"properties": {
"_creationTime": {"type": "number"},
"_id": {"$description": "Id(users)", "type": "object", "properties": {"$id": {"type": "string"}}},
"name": {"type": "string"},
"tokenIdentifier": {"type": "string"},
"_ts": {"type": "integer"},
"_deleted": {"type": "boolean"},
},
"$schema": "http://json-schema.org/draft-07/schema#",
},
}
responses.add(
responses.GET,
"https://murky-swan-635.convex.cloud/api/json_schemas?deltaSchema=true&format=convex_json",
json=sample_shapes_resp,
)
@responses.activate
def test_check_connection(mocker):
setup_responses()
source = SourceConvex()
logger_mock = MagicMock()
assert source.check_connection(
logger_mock,
{
"deployment_url": "https://murky-swan-635.convex.cloud",
"access_key": "test_api_key",
},
) == (True, None)
@responses.activate
def test_streams(mocker):
setup_responses()
source = SourceConvex()
streams = source.streams(
{
"deployment_url": "https://murky-swan-635.convex.cloud",
"access_key": "test_api_key",
}
)
assert len(streams) == 2
streams.sort(key=lambda stream: stream.table_name)
assert streams[0].table_name == "posts"
assert streams[1].table_name == "users"
assert all(stream.deployment_url == "https://murky-swan-635.convex.cloud" for stream in streams)
assert all(stream._session.auth.get_auth_header() == {"Authorization": "Convex test_api_key"} for stream in streams)
shapes = [stream.get_json_schema() for stream in streams]
assert all(shape["type"] == "object" for shape in shapes)
properties = [shape["properties"] for shape in shapes]
assert [
props["_id"]
== {
"type": "object",
"properties": {
"$id": {"type": "string"},
},
}
for props in properties
]
assert [props["_ts"] == {"type": "number"} for props in properties]
assert [props["_creationTime"] == {"type": "number"} for props in properties]
assert set(properties[0].keys()) == set(
["_id", "_ts", "_deleted", "_creationTime", "author", "body", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at"]
)
assert set(properties[1].keys()) == set(
["_id", "_ts", "_deleted", "_creationTime", "name", "tokenIdentifier", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at"]
)

View File

@@ -0,0 +1,106 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from unittest.mock import MagicMock
import pytest
from source_convex.source import ConvexStream
@pytest.fixture
def patch_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(ConvexStream, "path", "v0/example_endpoint")
mocker.patch.object(ConvexStream, "primary_key", "test_primary_key")
mocker.patch.object(ConvexStream, "__abstractmethods__", set())
def test_request_params(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
expected_params = {"tableName": "messages", "format": "convex_json"}
assert stream.request_params(**inputs) == expected_params
stream._snapshot_cursor_value = 1234
expected_params = {"tableName": "messages", "format": "convex_json", "cursor": 1234}
assert stream.request_params(**inputs) == expected_params
stream._snapshot_has_more = False
stream._delta_cursor_value = 2345
expected_params = {"tableName": "messages", "format": "convex_json", "cursor": 2345}
assert stream.request_params(**inputs) == expected_params
def test_next_page_token(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
resp = MagicMock()
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 5000, "hasMore": True}
stream.parse_response(resp, {})
assert stream.next_page_token(resp) == {
"snapshot_cursor": 1234,
"snapshot_has_more": True,
"delta_cursor": 5000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 1235, "snapshot": 5000, "hasMore": False}
stream.parse_response(resp, {})
assert stream.next_page_token(resp) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
"delta_cursor": 5000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 6000, "hasMore": True}
stream.parse_response(resp, {})
assert stream.next_page_token(resp) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
"delta_cursor": 6000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 7000, "hasMore": False}
stream.parse_response(resp, {})
assert stream.next_page_token(resp) is None
assert stream.state == {"snapshot_cursor": 1235, "snapshot_has_more": False, "delta_cursor": 7000}
def test_parse_response(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
resp = MagicMock()
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1234}], "cursor": 1234, "snapshot": 2000, "hasMore": True}
inputs = {"response": resp, "stream_state": {}}
expected_parsed_objects = [{"_id": "my_id", "field": "f", "_ts": 1234}]
assert stream.parse_response(**inputs) == expected_parsed_objects
assert stream.state == {"snapshot_cursor": 1234, "snapshot_has_more": True, "delta_cursor": 2000}
def test_request_headers(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
assert stream.request_headers(**inputs) == {}
def test_http_method(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
expected_method = "GET"
assert stream.http_method == expected_method
@pytest.mark.parametrize(
("http_status", "should_retry"),
[
(HTTPStatus.OK, False),
(HTTPStatus.BAD_REQUEST, False),
(HTTPStatus.TOO_MANY_REQUESTS, True),
(HTTPStatus.INTERNAL_SERVER_ERROR, True),
],
)
def test_should_retry(patch_base_class, http_status, should_retry):
response_mock = MagicMock()
response_mock.status_code = http_status
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
assert stream.should_retry(response_mock) == should_retry
def test_backoff_time(patch_base_class):
response_mock = MagicMock()
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
expected_backoff_time = None
assert stream.backoff_time(response_mock) == expected_backoff_time