1
0
mirror of synced 2026-01-07 00:05:48 -05:00

🐙 octavia-cli: add command to list existing sources, destinations and connections (#9642)

This commit is contained in:
Augustin
2022-01-25 10:12:14 +01:00
committed by GitHub
parent 0dfbfdc2e3
commit e05dfd1bcd
9 changed files with 444 additions and 281 deletions

View File

@@ -38,6 +38,7 @@ We welcome community contributions!
| Date | Milestone |
|------------|-------------------------------------|
| 2022-01-19 | Implement `octavia list workspace sources`, `octavia list workspace destinations`, `octavia list workspace connections`|
| 2022-01-17 | Implement `octavia list connectors source` and `octavia list connectors destinations`|
| 2022-01-17 | Generate an API Python client from our Open API spec |
| 2021-12-22 | Bootstrapping the project's code base |

View File

@@ -6,7 +6,7 @@ from typing import List
import click
from .connectors_definitions import DestinationConnectorsDefinitions, SourceConnectorsDefinitions
from .listings import Connections, DestinationConnectorsDefinitions, Destinations, SourceConnectorsDefinitions, Sources
@click.group("list", help="List existing Airbyte resources.")
@@ -21,23 +21,56 @@ def connectors(ctx: click.Context): # pragma: no cover
pass
@connectors.command(help="Latest information on supported sources.")
@click.group("workspace", help="Latest information on workspace's sources and destinations.")
@click.pass_context
def sources(ctx: click.Context):
def workspace(ctx: click.Context): # pragma: no cover
pass
@connectors.command(name="sources", help="Latest information on supported sources.")
@click.pass_context
def sources_connectors(ctx: click.Context):
api_client = ctx.obj["API_CLIENT"]
definitions = SourceConnectorsDefinitions(api_client)
click.echo(definitions)
@connectors.command(help="Latest information on supported destinations.")
@connectors.command(name="destination", help="Latest information on supported destinations.")
@click.pass_context
def destinations(ctx: click.Context):
def destinations_connectors(ctx: click.Context):
api_client = ctx.obj["API_CLIENT"]
definitions = DestinationConnectorsDefinitions(api_client)
click.echo(definitions)
AVAILABLE_COMMANDS: List[click.Command] = [connectors]
@workspace.command(help="List existing sources in a workspace.")
@click.pass_context
def sources(ctx: click.Context):
api_client = ctx.obj["API_CLIENT"]
workspace_id = ctx.obj["WORKSPACE_ID"]
sources = Sources(api_client, workspace_id)
click.echo(sources)
@workspace.command(help="List existing destinations in a workspace.")
@click.pass_context
def destinations(ctx: click.Context):
api_client = ctx.obj["API_CLIENT"]
workspace_id = ctx.obj["WORKSPACE_ID"]
destinations = Destinations(api_client, workspace_id)
click.echo(destinations)
@workspace.command(help="List existing connections in a workspace.")
@click.pass_context
def connections(ctx: click.Context):
api_client = ctx.obj["API_CLIENT"]
workspace_id = ctx.obj["WORKSPACE_ID"]
connections = Connections(api_client, workspace_id)
click.echo(connections)
AVAILABLE_COMMANDS: List[click.Command] = [connectors, workspace]
def add_commands_to_list():

View File

@@ -1,121 +0,0 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import abc
from enum import Enum
from typing import Callable, List, Union
import airbyte_api_client
from airbyte_api_client.api import destination_definition_api, source_definition_api
class DefinitionType(Enum):
SOURCE = "source"
DESTINATION = "destination"
class ConnectorsDefinitions(abc.ABC):
LIST_LATEST_DEFINITIONS_KWARGS = {"_check_return_type": False}
@property
@abc.abstractmethod
def api(
self,
) -> Union[source_definition_api.SourceDefinitionApi, destination_definition_api.DestinationDefinitionApi]: # pragma: no cover
pass
def __init__(self, definition_type: DefinitionType, api_client: airbyte_api_client.ApiClient, list_latest_definitions: Callable):
self.definition_type = definition_type
self.api_instance = self.api(api_client)
self.list_latest_definitions = list_latest_definitions
@property
def fields_to_display(self) -> List[str]:
return ["name", "dockerRepository", "dockerImageTag", f"{self.definition_type.value}DefinitionId"]
@property
def response_definition_list_field(self) -> str:
return f"{self.definition_type.value}_definitions"
def _parse_response(self, api_response) -> List[List[str]]:
definitions = [
[definition[field] for field in self.fields_to_display] for definition in api_response[self.response_definition_list_field]
]
return definitions
@property
def latest_definitions(self) -> List[List[str]]:
api_response = self.list_latest_definitions(self.api_instance, **self.LIST_LATEST_DEFINITIONS_KWARGS)
return self._parse_response(api_response)
# TODO alafanechere: declare in a specific formatting module because it will probably be reused
@staticmethod
def _compute_col_width(data: List[List[str]], padding: int = 2) -> int:
"""Compute column width for display purposes:
Find largest column size, add a padding of two characters.
Returns:
data (List[List[str]]): Tabular data containing rows and columns.
padding (int): Number of character to adds to create space between columns.
Returns:
col_width (int): The computed column width according to input data.
"""
col_width = max(len(col) for row in data for col in row) + padding
return col_width
# TODO alafanechere: declare in a specific formatting module because it will probably be reused
@staticmethod
def _camelcased_to_uppercased_spaced(camelcased: str) -> str:
"""Util function to transform a camelCase string to a UPPERCASED SPACED string
e.g: dockerImageName -> DOCKER IMAGE NAME
Args:
camelcased (str): The camel cased string to convert.
Returns:
(str): The converted UPPERCASED SPACED string
"""
return "".join(map(lambda x: x if x.islower() else " " + x, camelcased)).upper()
# TODO alafanechere: declare in a specific formatting module because it will probably be reused
@staticmethod
def _display_as_table(data: List[List[str]]) -> str:
"""Formats tabular input data into a displayable table with columns.
Args:
data (List[List[str]]): Tabular data containing rows and columns.
Returns:
table (str): String representation of input tabular data.
"""
col_width = ConnectorsDefinitions._compute_col_width(data)
table = "\n".join(["".join(col.ljust(col_width) for col in row) for row in data])
return table
# TODO alafanechere: declare in a specific formatting module because it will probably be reused
@staticmethod
def _format_column_names(camelcased_column_names: List[str]) -> List[str]:
"""Format camel cased column names to uppercased spaced column names
Args:
camelcased_column_names (List[str]): Column names in camel case.
Returns:
(List[str]): Column names in uppercase with spaces.
"""
return [ConnectorsDefinitions._camelcased_to_uppercased_spaced(column_name) for column_name in camelcased_column_names]
def __repr__(self):
definitions = [self._format_column_names(self.fields_to_display)] + self.latest_definitions
return self._display_as_table(definitions)
class SourceConnectorsDefinitions(ConnectorsDefinitions):
api = source_definition_api.SourceDefinitionApi
def __init__(self, api_client: airbyte_api_client.ApiClient):
super().__init__(DefinitionType.SOURCE, api_client, self.api.list_latest_source_definitions)
class DestinationConnectorsDefinitions(ConnectorsDefinitions):
api = destination_definition_api.DestinationDefinitionApi
def __init__(self, api_client: airbyte_api_client.ApiClient):
super().__init__(DefinitionType.DESTINATION, api_client, self.api.list_latest_destination_definitions)

View File

@@ -0,0 +1,59 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from typing import List
def compute_columns_width(data: List[List[str]], padding: int = 2) -> List[int]:
"""Compute columns width for display purposes:
Find size for each columns in the data and add padding.
Args:
data (List[List[str]]): Tabular data containing rows and columns.
padding (int): Number of character to adds to create space between columns.
Returns:
columns_width (List[int]): The computed columns widths for each column according to input data.
"""
columns_width = [0 for _ in data[0]]
for row in data:
for i, col in enumerate(row):
current_col_width = len(col) + padding
if current_col_width > columns_width[i]:
columns_width[i] = current_col_width
return columns_width
def camelcased_to_uppercased_spaced(camelcased: str) -> str:
"""Util function to transform a camelCase string to a UPPERCASED SPACED string
e.g: dockerImageName -> DOCKER IMAGE NAME
Args:
camelcased (str): The camel cased string to convert.
Returns:
(str): The converted UPPERCASED SPACED string
"""
return "".join(map(lambda x: x if x.islower() else " " + x, camelcased)).upper()
def display_as_table(data: List[List[str]]) -> str:
"""Formats tabular input data into a displayable table with columns.
Args:
data (List[List[str]]): Tabular data containing rows and columns.
Returns:
table (str): String representation of input tabular data.
"""
columns_width = compute_columns_width(data)
table = "\n".join(["".join(col.ljust(columns_width[i]) for i, col in enumerate(row)) for row in data])
return table
def format_column_names(camelcased_column_names: List[str]) -> List[str]:
"""Format camel cased column names to uppercased spaced column names
Args:
camelcased_column_names (List[str]): Column names in camel case.
Returns:
(List[str]): Column names in uppercase with spaces.
"""
return [camelcased_to_uppercased_spaced(column_name) for column_name in camelcased_column_names]

View File

@@ -0,0 +1,111 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import abc
from typing import List
import airbyte_api_client
import octavia_cli.list.formatting as formatting
from airbyte_api_client.api import connection_api, destination_api, destination_definition_api, source_api, source_definition_api
from airbyte_api_client.model.workspace_id_request_body import WorkspaceIdRequestBody
class BaseListing(abc.ABC):
COMMON_LIST_FUNCTION_KWARGS = {"_check_return_type": False}
@property
@abc.abstractmethod
def api(
self,
): # pragma: no cover
pass
@property
@abc.abstractmethod
def fields_to_display(
self,
) -> List[str]: # pragma: no cover
pass
@property
@abc.abstractmethod
def list_field_in_response(
self,
) -> str: # pragma: no cover
pass
@property
@abc.abstractmethod
def list_function_name(
self,
) -> str: # pragma: no cover
pass
@property
def _list_fn(self):
return getattr(self.api, self.list_function_name)
@property
def list_function_kwargs(self) -> dict:
return {}
def __init__(self, api_client: airbyte_api_client.ApiClient):
self.api_instance = self.api(api_client)
def _parse_response(self, api_response) -> List[List[str]]:
items = [[item[field] for field in self.fields_to_display] for item in api_response[self.list_field_in_response]]
return items
def get_listing(self) -> List[List[str]]:
api_response = self._list_fn(self.api_instance, **self.list_function_kwargs, **self.COMMON_LIST_FUNCTION_KWARGS)
return self._parse_response(api_response)
def __repr__(self):
items = [formatting.format_column_names(self.fields_to_display)] + self.get_listing()
return formatting.display_as_table(items)
class SourceConnectorsDefinitions(BaseListing):
api = source_definition_api.SourceDefinitionApi
fields_to_display = ["name", "dockerRepository", "dockerImageTag", "sourceDefinitionId"]
list_field_in_response = "source_definitions"
list_function_name = "list_latest_source_definitions"
class DestinationConnectorsDefinitions(BaseListing):
api = destination_definition_api.DestinationDefinitionApi
fields_to_display = ["name", "dockerRepository", "dockerImageTag", "destinationDefinitionId"]
list_field_in_response = "destination_definitions"
list_function_name = "list_latest_destination_definitions"
class WorkspaceListing(BaseListing, abc.ABC):
def __init__(self, api_client: airbyte_api_client.ApiClient, workspace_id: str):
self.workspace_id = workspace_id
super().__init__(api_client)
@property
def list_function_kwargs(self) -> dict:
return {"workspace_id_request_body": WorkspaceIdRequestBody(workspace_id=self.workspace_id)}
class Sources(WorkspaceListing):
api = source_api.SourceApi
fields_to_display = ["name", "sourceName", "sourceId"]
list_field_in_response = "sources"
list_function_name = "list_sources_for_workspace"
class Destinations(WorkspaceListing):
api = destination_api.DestinationApi
fields_to_display = ["name", "destinationName", "destinationId"]
list_field_in_response = "destinations"
list_function_name = "list_destinations_for_workspace"
class Connections(WorkspaceListing):
api = connection_api.ConnectionApi
fields_to_display = ["name", "connectionId", "status", "sourceId", "destinationId"]
list_field_in_response = "connections"
list_function_name = "list_connections_for_workspace"

View File

@@ -7,7 +7,7 @@ from octavia_cli.list import commands
def test_available_commands():
assert commands.AVAILABLE_COMMANDS == [commands.connectors]
assert commands.AVAILABLE_COMMANDS == [commands.connectors, commands.workspace]
def test_commands_in_list_group():
@@ -20,7 +20,7 @@ def test_connectors_sources(mocker):
mocker.patch.object(commands, "SourceConnectorsDefinitions", mocker.Mock(return_value="SourceConnectorsDefinitionsRepr"))
context_object = {"API_CLIENT": mocker.Mock()}
runner = CliRunner()
result = runner.invoke((commands.sources), obj=context_object)
result = runner.invoke(commands.sources_connectors, obj=context_object)
commands.SourceConnectorsDefinitions.assert_called_with(context_object["API_CLIENT"])
assert result.output == "SourceConnectorsDefinitionsRepr\n"
@@ -29,6 +29,33 @@ def test_connectors_destinations(mocker):
mocker.patch.object(commands, "DestinationConnectorsDefinitions", mocker.Mock(return_value="DestinationConnectorsDefinitionsRepr"))
context_object = {"API_CLIENT": mocker.Mock()}
runner = CliRunner()
result = runner.invoke((commands.destinations), obj=context_object)
result = runner.invoke(commands.destinations_connectors, obj=context_object)
commands.DestinationConnectorsDefinitions.assert_called_with(context_object["API_CLIENT"])
assert result.output == "DestinationConnectorsDefinitionsRepr\n"
def test_sources(mocker):
mocker.patch.object(commands, "Sources", mocker.Mock(return_value="SourcesRepr"))
context_object = {"API_CLIENT": mocker.Mock(), "WORKSPACE_ID": "my_workspace_id"}
runner = CliRunner()
result = runner.invoke(commands.sources, obj=context_object)
commands.Sources.assert_called_with(context_object["API_CLIENT"], context_object["WORKSPACE_ID"])
assert result.output == "SourcesRepr\n"
def test_destinations(mocker):
mocker.patch.object(commands, "Destinations", mocker.Mock(return_value="DestinationsRepr"))
context_object = {"API_CLIENT": mocker.Mock(), "WORKSPACE_ID": "my_workspace_id"}
runner = CliRunner()
result = runner.invoke(commands.destinations, obj=context_object)
commands.Destinations.assert_called_with(context_object["API_CLIENT"], context_object["WORKSPACE_ID"])
assert result.output == "DestinationsRepr\n"
def test_connections(mocker):
mocker.patch.object(commands, "Connections", mocker.Mock(return_value="ConnectionsRepr"))
context_object = {"API_CLIENT": mocker.Mock(), "WORKSPACE_ID": "my_workspace_id"}
runner = CliRunner()
result = runner.invoke(commands.connections, obj=context_object)
commands.Connections.assert_called_with(context_object["API_CLIENT"], context_object["WORKSPACE_ID"])
assert result.output == "ConnectionsRepr\n"

View File

@@ -1,151 +0,0 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import pytest
from airbyte_api_client.api import destination_definition_api, source_definition_api
from octavia_cli.list.connectors_definitions import (
ConnectorsDefinitions,
DefinitionType,
DestinationConnectorsDefinitions,
SourceConnectorsDefinitions,
)
def test_definition_type():
assert [definition_type.value for definition_type in DefinitionType] == ["source", "destination"]
class TestConnectorsDefinitions:
@pytest.fixture
def mock_api(self, mocker):
return mocker.Mock()
@pytest.fixture
def patch_base_class(self, mocker, mock_api):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(ConnectorsDefinitions, "api", mock_api)
mocker.patch.object(ConnectorsDefinitions, "__abstractmethods__", set())
@pytest.fixture
def connectors_definitions_mock_args(self, mocker):
return (mocker.Mock(value="my_definition_type"), mocker.Mock(), mocker.Mock())
def test_init(self, patch_base_class, mock_api, connectors_definitions_mock_args):
mock_definition_type, mock_api_client, mock_list_latest_definitions = connectors_definitions_mock_args
definitions = ConnectorsDefinitions(*connectors_definitions_mock_args)
assert definitions.definition_type == mock_definition_type
mock_api.assert_called_with(mock_api_client)
assert definitions.api_instance == mock_api.return_value
assert definitions.list_latest_definitions == mock_list_latest_definitions
def test_abstract_methods(self, connectors_definitions_mock_args):
assert ConnectorsDefinitions.__abstractmethods__ == {"api"}
with pytest.raises(TypeError):
ConnectorsDefinitions(*connectors_definitions_mock_args)
def test_fields_to_display(self, patch_base_class, connectors_definitions_mock_args):
definitions = ConnectorsDefinitions(*connectors_definitions_mock_args)
expected_field_to_display = ["name", "dockerRepository", "dockerImageTag", "my_definition_typeDefinitionId"]
assert definitions.fields_to_display == expected_field_to_display
def test_response_definition_list_field(self, patch_base_class, connectors_definitions_mock_args):
definitions = ConnectorsDefinitions(*connectors_definitions_mock_args)
expected_response_definition_list_field = "my_definition_type_definitions"
assert definitions.response_definition_list_field == expected_response_definition_list_field
def test_parse_response(self, patch_base_class, connectors_definitions_mock_args):
definitions = ConnectorsDefinitions(*connectors_definitions_mock_args)
api_response = {definitions.response_definition_list_field: []}
for i in range(5):
definition = {field: f"{field}_value_{i}" for field in definitions.fields_to_display}
definition["discarded_field"] = "discarded_value"
api_response[definitions.response_definition_list_field].append(definition)
parsed_definitions = definitions._parse_response(api_response)
assert len(parsed_definitions) == 5
for i in range(5):
assert parsed_definitions[i] == [f"{field}_value_{i}" for field in definitions.fields_to_display]
assert "discarded_value" not in parsed_definitions[i]
def test_latest_definitions(self, patch_base_class, mocker, connectors_definitions_mock_args):
mock_list_latest_definitions = connectors_definitions_mock_args[-1]
mocker.patch.object(ConnectorsDefinitions, "_parse_response")
definitions = ConnectorsDefinitions(*connectors_definitions_mock_args)
assert definitions.latest_definitions == definitions._parse_response.return_value
mock_list_latest_definitions.assert_called_with(definitions.api_instance, **definitions.LIST_LATEST_DEFINITIONS_KWARGS)
definitions._parse_response.assert_called_with(mock_list_latest_definitions.return_value)
def test_repr(self, patch_base_class, mocker, connectors_definitions_mock_args):
headers = ["fieldA", "fieldB", "fieldC"]
latest_definitions = [["a", "b", "c"]]
mocker.patch.object(ConnectorsDefinitions, "fields_to_display", headers)
mocker.patch.object(ConnectorsDefinitions, "latest_definitions", latest_definitions)
mocker.patch.object(ConnectorsDefinitions, "_display_as_table")
mocker.patch.object(ConnectorsDefinitions, "_format_column_names")
definitions = ConnectorsDefinitions(*connectors_definitions_mock_args)
representation = definitions.__repr__()
definitions._display_as_table.assert_called_with([definitions._format_column_names.return_value] + latest_definitions)
assert representation == definitions._display_as_table.return_value
@pytest.mark.parametrize(
"test_data,padding,expected_col_width",
[([["a", "___10chars"], ["e", "f"]], 2, 2 + 10), ([["a", "___10chars"], ["e", "____11chars"]], 2, 2 + 11), ([[""]], 2, 2)],
)
def test_compute_col_width(self, test_data, padding, expected_col_width):
col_width = ConnectorsDefinitions._compute_col_width(test_data, padding)
assert col_width == expected_col_width
@pytest.mark.parametrize(
"test_data,col_width,expected_output",
[
([["a", "___10chars"], ["e", "____11chars"]], 13, "a ___10chars \ne ____11chars "),
],
)
def test_display_as_table(self, mocker, test_data, col_width, expected_output):
mocker.patch.object(ConnectorsDefinitions, "_compute_col_width", mocker.Mock(return_value=col_width))
assert ConnectorsDefinitions._display_as_table(test_data) == expected_output
@pytest.mark.parametrize("input_camelcased,expected_output", [("camelCased", "CAMEL CASED"), ("notcamelcased", "NOTCAMELCASED")])
def test_camelcased_to_uppercased_spaced(self, input_camelcased, expected_output):
assert ConnectorsDefinitions._camelcased_to_uppercased_spaced(input_camelcased) == expected_output
def test_format_column_names(self, mocker):
columns_to_format = ["camelCased"]
formatted_columns = ConnectorsDefinitions._format_column_names(columns_to_format)
assert len(formatted_columns) == 1
for i, c in enumerate(formatted_columns):
assert c == ConnectorsDefinitions._camelcased_to_uppercased_spaced(columns_to_format[i])
class TestSubConnectorsDefinitions:
@pytest.fixture
def mock_api_client(self, mocker):
return mocker.Mock()
@pytest.mark.parametrize(
"definition_type,SubDefinitionClass,list_latest_definitions",
[
(DefinitionType.SOURCE, SourceConnectorsDefinitions, source_definition_api.SourceDefinitionApi.list_latest_source_definitions),
(
DefinitionType.DESTINATION,
DestinationConnectorsDefinitions,
destination_definition_api.DestinationDefinitionApi.list_latest_destination_definitions,
),
],
)
def test_init(self, mocker, mock_api_client, definition_type, SubDefinitionClass, list_latest_definitions):
definitions_init = mocker.Mock()
mocker.patch.object(ConnectorsDefinitions, "__init__", definitions_init)
SubDefinitionClass(mock_api_client)
definitions_init.assert_called_with(definition_type, mock_api_client, list_latest_definitions)
@pytest.mark.parametrize(
"SubDefinitionClass,expected_api",
[
(SourceConnectorsDefinitions, source_definition_api.SourceDefinitionApi),
(DestinationConnectorsDefinitions, destination_definition_api.DestinationDefinitionApi),
],
)
def test_class_attributes(self, SubDefinitionClass, expected_api):
assert SubDefinitionClass.api == expected_api

View File

@@ -0,0 +1,45 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import pytest
from octavia_cli.list import formatting
PADDING = 2
@pytest.mark.parametrize(
"test_data,expected_columns_width",
[
([["a", "___10chars"], ["e", "f"]], [1 + PADDING, 10 + PADDING]),
([["a", "___10chars"], ["e", "____11chars"]], [1 + PADDING, 11 + PADDING]),
([[""]], [PADDING]),
],
)
def test_compute_columns_width(test_data, expected_columns_width):
columns_width = formatting.compute_columns_width(test_data, PADDING)
assert columns_width == expected_columns_width
@pytest.mark.parametrize("input_camelcased,expected_output", [("camelCased", "CAMEL CASED"), ("notcamelcased", "NOTCAMELCASED")])
def test_camelcased_to_uppercased_spaced(input_camelcased, expected_output):
assert formatting.camelcased_to_uppercased_spaced(input_camelcased) == expected_output
@pytest.mark.parametrize(
"test_data,columns_width,expected_output",
[
([["a", "___10chars"], ["e", "____11chars"]], [1 + PADDING, 11 + PADDING], "a ___10chars \ne ____11chars "),
],
)
def test_display_as_table(mocker, test_data, columns_width, expected_output):
mocker.patch.object(formatting, "compute_columns_width", mocker.Mock(return_value=columns_width))
assert formatting.display_as_table(test_data) == expected_output
def test_format_column_names():
columns_to_format = ["camelCased"]
formatted_columns = formatting.format_column_names(columns_to_format)
assert len(formatted_columns) == 1
for i, c in enumerate(formatted_columns):
assert c == formatting.camelcased_to_uppercased_spaced(columns_to_format[i])

View File

@@ -0,0 +1,159 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import pytest
from airbyte_api_client.api import connection_api, destination_api, destination_definition_api, source_api, source_definition_api
from octavia_cli.list import listings
from octavia_cli.list.listings import (
BaseListing,
Connections,
DestinationConnectorsDefinitions,
Destinations,
SourceConnectorsDefinitions,
Sources,
WorkspaceListing,
)
@pytest.fixture
def mock_api_client(mocker):
return mocker.Mock()
class TestBaseListing:
@pytest.fixture
def patch_base_class(self, mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(BaseListing, "__abstractmethods__", set())
mocker.patch.object(BaseListing, "list_function_name", "my_list_function_name")
mocker.patch.object(BaseListing, "api", mocker.Mock(my_list_function_name=mocker.Mock()))
def test_init(self, patch_base_class, mock_api_client):
base_listing = BaseListing(mock_api_client)
assert base_listing._list_fn == BaseListing.api.my_list_function_name
assert base_listing.list_function_kwargs == {}
assert base_listing.api_instance == base_listing.api.return_value
base_listing.api.assert_called_with(mock_api_client)
assert base_listing.COMMON_LIST_FUNCTION_KWARGS == {"_check_return_type": False}
def test_abstract_methods(self, mock_api_client):
assert BaseListing.__abstractmethods__ == {"api", "fields_to_display", "list_field_in_response", "list_function_name"}
with pytest.raises(TypeError):
BaseListing(mock_api_client)
def test_parse_response(self, patch_base_class, mocker, mock_api_client):
mocker.patch.object(BaseListing, "fields_to_display", ["fieldA", "fieldB"])
base_listing = BaseListing(mock_api_client)
api_response = {base_listing.list_field_in_response: []}
for i in range(5):
definition = {field: f"{field}_value_{i}" for field in base_listing.fields_to_display}
definition["discarded_field"] = "discarded_value"
api_response[base_listing.list_field_in_response].append(definition)
parsed_listing = base_listing._parse_response(api_response)
assert len(parsed_listing) == 5
for i in range(5):
assert parsed_listing[i] == [f"{field}_value_{i}" for field in base_listing.fields_to_display]
assert "discarded_value" not in parsed_listing[i]
def test_gest_listing(self, patch_base_class, mocker, mock_api_client):
mocker.patch.object(BaseListing, "_parse_response")
mocker.patch.object(BaseListing, "_list_fn")
base_listing = BaseListing(mock_api_client)
listing = base_listing.get_listing()
base_listing._list_fn.assert_called_with(
base_listing.api_instance, **base_listing.list_function_kwargs, **base_listing.COMMON_LIST_FUNCTION_KWARGS
)
base_listing._parse_response.assert_called_with(base_listing._list_fn.return_value)
assert listing == base_listing._parse_response.return_value
def test_repr(self, patch_base_class, mocker, mock_api_client):
headers = ["fieldA", "fieldB", "fieldC"]
api_response_listing = [["a", "b", "c"]]
mocker.patch.object(BaseListing, "fields_to_display", headers)
mocker.patch.object(BaseListing, "get_listing", mocker.Mock(return_value=api_response_listing))
mocker.patch.object(listings, "formatting")
base_listing = BaseListing(mock_api_client)
representation = base_listing.__repr__()
listings.formatting.display_as_table.assert_called_with(
[listings.formatting.format_column_names.return_value] + api_response_listing
)
assert representation == listings.formatting.display_as_table.return_value
class TestSourceConnectorsDefinitions:
def test_init(self, mock_api_client):
assert SourceConnectorsDefinitions.__base__ == BaseListing
source_connectors_definition = SourceConnectorsDefinitions(mock_api_client)
assert source_connectors_definition.api == source_definition_api.SourceDefinitionApi
assert source_connectors_definition.fields_to_display == ["name", "dockerRepository", "dockerImageTag", "sourceDefinitionId"]
assert source_connectors_definition.list_field_in_response == "source_definitions"
assert source_connectors_definition.list_function_name == "list_latest_source_definitions"
class TestDestinationConnectorsDefinitions:
def test_init(self, mock_api_client):
assert DestinationConnectorsDefinitions.__base__ == BaseListing
destination_connectors_definition = DestinationConnectorsDefinitions(mock_api_client)
assert destination_connectors_definition.api == destination_definition_api.DestinationDefinitionApi
assert destination_connectors_definition.fields_to_display == [
"name",
"dockerRepository",
"dockerImageTag",
"destinationDefinitionId",
]
assert destination_connectors_definition.list_field_in_response == "destination_definitions"
assert destination_connectors_definition.list_function_name == "list_latest_destination_definitions"
class TestWorkspaceListing:
@pytest.fixture
def patch_base_class(self, mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(WorkspaceListing, "__abstractmethods__", set())
mocker.patch.object(WorkspaceListing, "api", mocker.Mock())
def test_init(self, patch_base_class, mocker, mock_api_client):
mocker.patch.object(listings, "WorkspaceIdRequestBody")
mocker.patch.object(BaseListing, "__init__")
assert WorkspaceListing.__base__ == BaseListing
sources_and_destinations = WorkspaceListing(mock_api_client, "my_workspace_id")
assert sources_and_destinations.workspace_id == "my_workspace_id"
assert sources_and_destinations.list_function_kwargs == {"workspace_id_request_body": listings.WorkspaceIdRequestBody.return_value}
listings.WorkspaceIdRequestBody.assert_called_with(workspace_id="my_workspace_id")
BaseListing.__init__.assert_called_with(mock_api_client)
def test_abstract(self, mock_api_client):
with pytest.raises(TypeError):
WorkspaceListing(mock_api_client)
class TestSources:
def test_init(self, mock_api_client):
assert Sources.__base__ == WorkspaceListing
sources = Sources(mock_api_client, "my_workspace_id")
assert sources.api == source_api.SourceApi
assert sources.fields_to_display == ["name", "sourceName", "sourceId"]
assert sources.list_field_in_response == "sources"
assert sources.list_function_name == "list_sources_for_workspace"
class TestDestinations:
def test_init(self, mock_api_client):
assert Destinations.__base__ == WorkspaceListing
destinations = Destinations(mock_api_client, "my_workspace_id")
assert destinations.api == destination_api.DestinationApi
assert destinations.fields_to_display == ["name", "destinationName", "destinationId"]
assert destinations.list_field_in_response == "destinations"
assert destinations.list_function_name == "list_destinations_for_workspace"
class TestConnections:
def test_init(self, mock_api_client):
assert Connections.__base__ == WorkspaceListing
connections = Connections(mock_api_client, "my_workspace_id")
assert connections.api == connection_api.ConnectionApi
assert connections.fields_to_display == ["name", "connectionId", "status", "sourceId", "destinationId"]
assert connections.list_field_in_response == "connections"
assert connections.list_function_name == "list_connections_for_workspace"