Test AirbyteEntrypoint.run (#3261)
This commit is contained in:
@@ -21,10 +21,21 @@
|
||||
# SOFTWARE.
|
||||
|
||||
|
||||
from argparse import Namespace
|
||||
from copy import deepcopy
|
||||
from typing import Any, List, Mapping
|
||||
from typing import Any, List, Mapping, MutableMapping, Union
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk import (
|
||||
AirbyteCatalog,
|
||||
AirbyteConnectionStatus,
|
||||
AirbyteMessage,
|
||||
AirbyteRecordMessage,
|
||||
AirbyteStream,
|
||||
ConnectorSpecification,
|
||||
Status,
|
||||
Type,
|
||||
)
|
||||
from airbyte_cdk.base_python import AirbyteEntrypoint, Source
|
||||
|
||||
|
||||
@@ -76,10 +87,71 @@ def test_parse_valid_args(cmd: str, args: Mapping[str, Any], entrypoint: Airbyte
|
||||
("read", {"config": "config_path", "catalog": "catalog_path"}),
|
||||
],
|
||||
)
|
||||
def test_parse_missing_required_args(cmd: str, args: Mapping[str, Any], entrypoint: AirbyteEntrypoint):
|
||||
def test_parse_missing_required_args(cmd: str, args: MutableMapping[str, Any], entrypoint: AirbyteEntrypoint):
|
||||
required_args = {"check": ["config"], "discover": ["config"], "read": ["config", "catalog"]}
|
||||
for required_arg in required_args[cmd]:
|
||||
argcopy = deepcopy(args)
|
||||
del argcopy[required_arg]
|
||||
with pytest.raises(BaseException):
|
||||
entrypoint.parse_args(_as_arglist(cmd, argcopy))
|
||||
|
||||
|
||||
def _wrap_message(submessage: Union[AirbyteConnectionStatus, ConnectorSpecification, AirbyteRecordMessage, AirbyteCatalog]) -> str:
|
||||
if isinstance(submessage, AirbyteConnectionStatus):
|
||||
message = AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=submessage)
|
||||
elif isinstance(submessage, ConnectorSpecification):
|
||||
message = AirbyteMessage(type=Type.SPEC, spec=submessage)
|
||||
elif isinstance(submessage, AirbyteCatalog):
|
||||
message = AirbyteMessage(type=Type.CATALOG, catalog=submessage)
|
||||
elif isinstance(submessage, AirbyteRecordMessage):
|
||||
message = AirbyteMessage(type=Type.RECORD, record=submessage)
|
||||
else:
|
||||
raise Exception(f"Unknown message type: {submessage}")
|
||||
|
||||
return message.json(exclude_unset=True)
|
||||
|
||||
|
||||
def test_run_spec(entrypoint: AirbyteEntrypoint, mocker):
|
||||
parsed_args = Namespace(command="spec")
|
||||
expected = ConnectorSpecification(connectionSpecification={"hi": "hi"})
|
||||
mocker.patch.object(MockSource, "spec", return_value=expected)
|
||||
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
|
||||
|
||||
|
||||
def test_run_check(entrypoint: AirbyteEntrypoint, mocker):
|
||||
parsed_args = Namespace(command="check", config="config_path")
|
||||
config = {"username": "fake"}
|
||||
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
|
||||
mocker.patch.object(MockSource, "read_config", return_value=config)
|
||||
mocker.patch.object(MockSource, "configure", return_value=config)
|
||||
mocker.patch.object(MockSource, "check", return_value=check_value)
|
||||
assert [_wrap_message(check_value)] == list(entrypoint.run(parsed_args))
|
||||
|
||||
|
||||
def test_run_discover(entrypoint: AirbyteEntrypoint, mocker):
|
||||
parsed_args = Namespace(command="discover", config="config_path")
|
||||
config = {"username": "fake"}
|
||||
expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"})])
|
||||
mocker.patch.object(MockSource, "read_config", return_value=config)
|
||||
mocker.patch.object(MockSource, "configure", return_value=config)
|
||||
mocker.patch.object(MockSource, "discover", return_value=expected)
|
||||
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
|
||||
|
||||
|
||||
def test_run_read(entrypoint: AirbyteEntrypoint, mocker):
|
||||
parsed_args = Namespace(command="read", config="config_path", state="statepath", catalog="catalogpath")
|
||||
config = {"username": "fake"}
|
||||
expected = AirbyteRecordMessage(stream="stream", data={"data": "stuff"}, emitted_at=1)
|
||||
mocker.patch.object(MockSource, "read_config", return_value=config)
|
||||
mocker.patch.object(MockSource, "configure", return_value=config)
|
||||
mocker.patch.object(MockSource, "read_state", return_value={})
|
||||
mocker.patch.object(MockSource, "read_catalog", return_value={})
|
||||
mocker.patch.object(MockSource, "read", return_value=[AirbyteMessage(record=expected, type=Type.RECORD)])
|
||||
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
|
||||
|
||||
|
||||
def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker):
|
||||
with pytest.raises(Exception):
|
||||
mocker.patch.object(MockSource, "read_config", return_value={})
|
||||
mocker.patch.object(MockSource, "configure", return_value={})
|
||||
list(entrypoint.run(Namespace(command="invalid", config="conf")))
|
||||
|
||||
Reference in New Issue
Block a user