SAT: DX improvements, better error handling and more (#4260)
small fixes for SAT for better DX: - better stack trace in case of error inside the connector, print only relevant information with proper formatting (multiline stack trace instead of single string) - better logging - print message about image pulling only when it actually happens, stop tests if image not found - using discovery command for json_schema, when configured_catalog will be loaded we populate `json_schema` from a schema that we get from discovery command, the result is cached for all session duration. - better record comparison, takes care of lists inside dicts - because lists are unordered we will have false negatives when compare serialized records. - copied pytest config to airbyte root folder, so when pytest runs tests locally it can find it, this will affect all local execution of pytest - add IPython as a standard debugger Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
This commit is contained in:
@@ -8,7 +8,7 @@ COPY setup.py ./
|
||||
COPY pytest.ini ./
|
||||
RUN pip install .
|
||||
|
||||
LABEL io.airbyte.version=0.1.3
|
||||
LABEL io.airbyte.version=0.1.4
|
||||
LABEL io.airbyte.name=airbyte/source-acceptance-test
|
||||
|
||||
ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
|
||||
|
||||
@@ -29,9 +29,9 @@ MAIN_REQUIREMENTS = [
|
||||
"airbyte-cdk~=0.1",
|
||||
"docker~=4.4",
|
||||
"PyYAML~=5.4",
|
||||
"inflection~=0.5",
|
||||
"icdiff~=1.9",
|
||||
"pendulum~=1.2",
|
||||
"inflection~=0.5",
|
||||
"pdbpp~=0.10",
|
||||
"pydantic~=1.6",
|
||||
"pytest~=6.1",
|
||||
"pytest-sugar~=0.9",
|
||||
|
||||
@@ -29,7 +29,8 @@ from pathlib import Path
|
||||
from typing import Any, List, MutableMapping, Optional
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk.models import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification
|
||||
from airbyte_cdk.models import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type
|
||||
from docker import errors
|
||||
from source_acceptance_test.config import Config
|
||||
from source_acceptance_test.utils import ConnectorRunner, SecretDict, load_config
|
||||
|
||||
@@ -75,9 +76,12 @@ def configured_catalog_path_fixture(inputs, base_path) -> Optional[str]:
|
||||
|
||||
|
||||
@pytest.fixture(name="configured_catalog")
|
||||
def configured_catalog_fixture(configured_catalog_path) -> Optional[ConfiguredAirbyteCatalog]:
|
||||
def configured_catalog_fixture(configured_catalog_path, catalog_schemas) -> Optional[ConfiguredAirbyteCatalog]:
|
||||
if configured_catalog_path:
|
||||
return ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
|
||||
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
|
||||
for configured_stream in catalog.streams:
|
||||
configured_stream.stream.json_schema = catalog_schemas.get(configured_stream.stream.name, {})
|
||||
return catalog
|
||||
return None
|
||||
|
||||
|
||||
@@ -128,9 +132,12 @@ def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner:
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def pull_docker_image(acceptance_test_config) -> None:
|
||||
"""Startup fixture to pull docker image"""
|
||||
print("Pulling docker image", acceptance_test_config.connector_image)
|
||||
ConnectorRunner(image_name=acceptance_test_config.connector_image, volume=Path("."))
|
||||
print("Pulling completed")
|
||||
image_name = acceptance_test_config.connector_image
|
||||
config_filename = "acceptance-test-config.yml"
|
||||
try:
|
||||
ConnectorRunner(image_name=image_name, volume=Path("."))
|
||||
except errors.ImageNotFound:
|
||||
pytest.exit(f"Docker image `{image_name}` not found, please check your {config_filename} file", returncode=1)
|
||||
|
||||
|
||||
@pytest.fixture(name="expected_records")
|
||||
@@ -141,3 +148,21 @@ def expected_records_fixture(inputs, base_path) -> List[AirbyteRecordMessage]:
|
||||
|
||||
with open(str(base_path / getattr(expect_records, "path"))) as f:
|
||||
return [AirbyteRecordMessage.parse_raw(line) for line in f]
|
||||
|
||||
|
||||
@pytest.fixture(name="cached_schemas", scope="session")
|
||||
def cached_schemas_fixture() -> MutableMapping[str, Any]:
|
||||
"""Simple cache for discovered catalog: stream_name -> json_schema"""
|
||||
return {}
|
||||
|
||||
|
||||
@pytest.fixture(name="catalog_schemas")
|
||||
def catalog_schemas_fixture(connector_config, docker_runner: ConnectorRunner, cached_schemas) -> MutableMapping[str, Any]:
|
||||
"""JSON schemas for each stream"""
|
||||
if not cached_schemas:
|
||||
output = docker_runner.call_discover(config=connector_config)
|
||||
catalogs = [message.catalog for message in output if message.type == Type.CATALOG]
|
||||
for stream in catalogs[-1].streams:
|
||||
cached_schemas[stream.name] = stream.json_schema
|
||||
|
||||
return cached_schemas
|
||||
|
||||
@@ -23,7 +23,6 @@
|
||||
#
|
||||
|
||||
|
||||
import json
|
||||
from collections import Counter, defaultdict
|
||||
from typing import Any, List, Mapping, MutableMapping
|
||||
|
||||
@@ -32,7 +31,7 @@ from airbyte_cdk.models import AirbyteMessage, ConnectorSpecification, Status, T
|
||||
from docker.errors import ContainerError
|
||||
from source_acceptance_test.base import BaseTest
|
||||
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig
|
||||
from source_acceptance_test.utils import ConnectorRunner
|
||||
from source_acceptance_test.utils import ConnectorRunner, serialize
|
||||
|
||||
|
||||
@pytest.mark.timeout(10)
|
||||
@@ -152,8 +151,8 @@ class TestBasicRead(BaseTest):
|
||||
r2 = TestBasicRead.remove_extra_fields(r2, r1)
|
||||
assert r1 == r2, f"Stream {stream_name}: Mismatch of record order or values"
|
||||
else:
|
||||
expected = set(map(TestBasicRead.serialize_record_for_comparison, expected))
|
||||
actual = set(map(TestBasicRead.serialize_record_for_comparison, actual))
|
||||
expected = set(map(serialize, expected))
|
||||
actual = set(map(serialize, actual))
|
||||
missing_expected = set(expected) - set(actual)
|
||||
|
||||
assert not missing_expected, f"Stream {stream_name}: All expected records must be produced"
|
||||
@@ -170,7 +169,3 @@ class TestBasicRead(BaseTest):
|
||||
result[record.stream].append(record.data)
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def serialize_record_for_comparison(record: Mapping) -> str:
|
||||
return json.dumps(record, sort_keys=True)
|
||||
|
||||
@@ -23,13 +23,10 @@
|
||||
#
|
||||
|
||||
|
||||
import json
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk.models import Type
|
||||
from source_acceptance_test.base import BaseTest
|
||||
from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog
|
||||
from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog, serialize
|
||||
|
||||
|
||||
@pytest.mark.timeout(20 * 60)
|
||||
@@ -41,7 +38,6 @@ class TestFullRefresh(BaseTest):
|
||||
|
||||
output = docker_runner.call_read(connector_config, configured_catalog)
|
||||
records_2 = [message.record.data for message in output if message.type == Type.RECORD]
|
||||
serialize = partial(json.dumps, sort_keys=True)
|
||||
|
||||
assert not (
|
||||
set(map(serialize, records_1)) - set(map(serialize, records_2))
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from .common import SecretDict, filter_output, full_refresh_only_catalog, incremental_only_catalog, load_config
|
||||
from .compare import diff_dicts
|
||||
from .compare import diff_dicts, serialize
|
||||
from .connector_runner import ConnectorRunner
|
||||
from .json_schema_helper import JsonSchemaHelper
|
||||
|
||||
@@ -12,4 +12,5 @@ __all__ = [
|
||||
"SecretDict",
|
||||
"ConnectorRunner",
|
||||
"diff_dicts",
|
||||
"serialize",
|
||||
]
|
||||
|
||||
@@ -23,7 +23,8 @@
|
||||
#
|
||||
|
||||
|
||||
from typing import List, Optional
|
||||
import json
|
||||
from typing import List, Mapping, Optional
|
||||
|
||||
import icdiff
|
||||
import py
|
||||
@@ -67,3 +68,12 @@ def diff_dicts(left, right, use_markup) -> Optional[List[str]]:
|
||||
icdiff_lines = list(differ.make_table(pretty_left, pretty_right, context=True))
|
||||
|
||||
return ["equals failed"] + [color_off + line for line in icdiff_lines]
|
||||
|
||||
|
||||
def serialize(value) -> str:
|
||||
"""Simplify comparison of nested dicts/lists"""
|
||||
if isinstance(value, Mapping):
|
||||
return json.dumps({k: serialize(v) for k, v in value.items()}, sort_keys=True)
|
||||
if isinstance(value, List):
|
||||
return sorted([serialize(v) for v in value])
|
||||
return str(value)
|
||||
|
||||
@@ -30,6 +30,7 @@ from typing import Iterable, List, Mapping, Optional
|
||||
|
||||
import docker
|
||||
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
|
||||
from docker.errors import ContainerError
|
||||
from pydantic import ValidationError
|
||||
|
||||
|
||||
@@ -39,7 +40,9 @@ class ConnectorRunner:
|
||||
try:
|
||||
self._image = self._client.images.get(image_name)
|
||||
except docker.errors.ImageNotFound:
|
||||
print("Pulling docker image", image_name)
|
||||
self._image = self._client.images.pull(image_name)
|
||||
print("Pulling completed")
|
||||
self._runs = 0
|
||||
self._volume_base = volume
|
||||
|
||||
@@ -107,10 +110,17 @@ class ConnectorRunner:
|
||||
def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[AirbyteMessage]:
|
||||
self._runs += 1
|
||||
volumes = self._prepare_volumes(config, state, catalog)
|
||||
logs = self._client.containers.run(
|
||||
image=self._image, command=cmd, working_dir="/data", volumes=volumes, network="host", stdout=True, stderr=True, **kwargs
|
||||
)
|
||||
logging.info("Docker run: \n%s\ninput: %s\noutput: %s", cmd, self.input_folder, self.output_folder)
|
||||
try:
|
||||
logs = self._client.containers.run(
|
||||
image=self._image, command=cmd, working_dir="/data", volumes=volumes, network="host", stdout=True, stderr=True, **kwargs
|
||||
)
|
||||
except ContainerError as err:
|
||||
# beautify error from container
|
||||
patched_error = ContainerError(
|
||||
container=err.container, exit_status=err.exit_status, command=err.command, image=err.image, stderr=err.stderr.decode()
|
||||
)
|
||||
raise patched_error from None # get rid of any previous exception stack
|
||||
|
||||
with open(str(self.output_folder / "raw"), "wb+") as f:
|
||||
f.write(logs)
|
||||
|
||||
3
pytest.ini
Normal file
3
pytest.ini
Normal file
@@ -0,0 +1,3 @@
|
||||
[pytest]
|
||||
|
||||
addopts = -r a --capture=no -vv --log-level=INFO --color=yes
|
||||
Reference in New Issue
Block a user