diff --git a/airbyte-ci/connectors/live-tests/src/live_tests/commons/connector_runner.py b/airbyte-ci/connectors/live-tests/src/live_tests/commons/connector_runner.py index 3e5838dd7d8..0e721346600 100644 --- a/airbyte-ci/connectors/live-tests/src/live_tests/commons/connector_runner.py +++ b/airbyte-ci/connectors/live-tests/src/live_tests/commons/connector_runner.py @@ -6,6 +6,8 @@ from __future__ import annotations import datetime import json import logging +import os +import subprocess import uuid from pathlib import Path from typing import Optional @@ -23,11 +25,13 @@ class ConnectorRunner: IN_CONTAINER_CONFIGURED_CATALOG_PATH = "/data/catalog.json" IN_CONTAINER_STATE_PATH = "/data/state.json" IN_CONTAINER_OUTPUT_PATH = "/output.txt" + IN_CONTAINER_OBFUSCATOR_PATH = "/user/local/bin/record_obfuscator.py" def __init__( self, dagger_client: dagger.Client, execution_inputs: ExecutionInputs, + is_airbyte_ci: bool, http_proxy: Optional[Proxy] = None, ): self.connector_under_test = execution_inputs.connector_under_test @@ -45,6 +49,11 @@ class ConnectorRunner: self.http_proxy = http_proxy self.logger = logging.getLogger(f"{self.connector_under_test.name}-{self.connector_under_test.version}") self.dagger_client = dagger_client.pipeline(f"{self.connector_under_test.name}-{self.connector_under_test.version}") + if is_airbyte_ci: + self.host_obfuscator_path = "/tmp/record_obfuscator.py" + else: + repo_root = Path(subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).strip().decode()) + self.host_obfuscator_path = f"{repo_root}/tools/bin/record_obfuscator.py" @property def _connector_under_test_container(self) -> dagger.Container: @@ -109,6 +118,12 @@ class ConnectorRunner: container = self._connector_under_test_container # Do not cache downstream dagger layers container = container.with_env_variable("CACHEBUSTER", str(uuid.uuid4())) + expanded_host_executable_path = os.path.expanduser(self.host_obfuscator_path) + + container = container.with_file( + self.IN_CONTAINER_OBFUSCATOR_PATH, + self.dagger_client.host().file(expanded_host_executable_path), + ) for env_var_name, env_var_value in self.environment_variables.items(): container = container.with_env_variable(env_var_name, env_var_value) if self.config: @@ -134,7 +149,8 @@ class ConnectorRunner: [ "sh", "-c", - " ".join(airbyte_command) + f" > {self.IN_CONTAINER_OUTPUT_PATH} 2>&1 | tee -a {self.IN_CONTAINER_OUTPUT_PATH}", + " ".join(airbyte_command) + + f"| {self.IN_CONTAINER_OBFUSCATOR_PATH} > {self.IN_CONTAINER_OUTPUT_PATH} 2>&1 | tee -a {self.IN_CONTAINER_OUTPUT_PATH}", ], skip_entrypoint=True, ) diff --git a/airbyte-ci/connectors/live-tests/src/live_tests/commons/models.py b/airbyte-ci/connectors/live-tests/src/live_tests/commons/models.py index a8abbcfbe2d..469ca62a785 100644 --- a/airbyte-ci/connectors/live-tests/src/live_tests/commons/models.py +++ b/airbyte-ci/connectors/live-tests/src/live_tests/commons/models.py @@ -321,10 +321,36 @@ class ExecutionResult: stream_schema_builder = SchemaBuilder() stream_schema_builder.add_schema({"type": "object", "properties": {}}) stream_builders[stream] = stream_schema_builder - stream_builders[stream].add_object(record.record.data) + stream_builders[stream].add_object(self.get_obfuscated_types(record.record.data)) self.logger.info("Stream schemas generated") return {stream: sort_dict_keys(stream_builders[stream].to_schema()) for stream in stream_builders} + @staticmethod + def get_obfuscated_types(data: dict[str, Any]) -> dict[str, Any]: + """ + Convert obfuscated records into a record whose values have the same type as the original values. + """ + types = {} + for k, v in data.items(): + if v.startswith("string_"): + types[k] = "a" + elif v.startswith("integer_"): + types[k] = 0 + elif v.startswith("number_"): + types[k] = 0.1 + elif v.startswith("boolean_"): + types[k] = True + elif v.startswith("null_"): + types[k] = None + elif v.startswith("array_"): + types[k] = [] + elif v.startswith("object_"): + types[k] = {} + else: + types[k] = v + + return types + def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]: assert self.backend is not None, "Backend must be set to get records per stream" self.logger.info(f"Reading records for stream {stream}") diff --git a/airbyte-ci/connectors/live-tests/src/live_tests/conftest.py b/airbyte-ci/connectors/live-tests/src/live_tests/conftest.py index 40748143c7e..938f07d098c 100644 --- a/airbyte-ci/connectors/live-tests/src/live_tests/conftest.py +++ b/airbyte-ci/connectors/live-tests/src/live_tests/conftest.py @@ -467,12 +467,14 @@ def spec_control_execution_inputs( @pytest.fixture(scope="session") def spec_control_connector_runner( + request: SubRequest, dagger_client: dagger.Client, spec_control_execution_inputs: ExecutionInputs, ) -> ConnectorRunner: runner = ConnectorRunner( dagger_client, spec_control_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], ) return runner @@ -507,12 +509,14 @@ def spec_target_execution_inputs( @pytest.fixture(scope="session") def spec_target_connector_runner( + request: SubRequest, dagger_client: dagger.Client, spec_target_execution_inputs: ExecutionInputs, ) -> ConnectorRunner: runner = ConnectorRunner( dagger_client, spec_target_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], ) return runner @@ -551,6 +555,7 @@ def check_control_execution_inputs( @pytest.fixture(scope="session") async def check_control_connector_runner( + request: SubRequest, dagger_client: dagger.Client, check_control_execution_inputs: ExecutionInputs, connection_id: str, @@ -560,6 +565,7 @@ async def check_control_connector_runner( runner = ConnectorRunner( dagger_client, check_control_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) yield runner @@ -600,6 +606,7 @@ def check_target_execution_inputs( @pytest.fixture(scope="session") async def check_target_connector_runner( + request: SubRequest, check_control_execution_result: ExecutionResult, dagger_client: dagger.Client, check_target_execution_inputs: ExecutionInputs, @@ -614,6 +621,7 @@ async def check_target_connector_runner( runner = ConnectorRunner( dagger_client, check_target_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) yield runner @@ -685,6 +693,7 @@ def discover_target_execution_inputs( @pytest.fixture(scope="session") async def discover_control_connector_runner( + request: SubRequest, dagger_client: dagger.Client, discover_control_execution_inputs: ExecutionInputs, connection_id: str, @@ -694,6 +703,7 @@ async def discover_control_connector_runner( yield ConnectorRunner( dagger_client, discover_control_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) await proxy.clear_cache_volume() @@ -701,6 +711,7 @@ async def discover_control_connector_runner( @pytest.fixture(scope="session") async def discover_target_connector_runner( + request: SubRequest, dagger_client: dagger.Client, discover_control_execution_result: ExecutionResult, discover_target_execution_inputs: ExecutionInputs, @@ -716,6 +727,7 @@ async def discover_target_connector_runner( yield ConnectorRunner( dagger_client, discover_target_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) await proxy.clear_cache_volume() @@ -776,6 +788,7 @@ def read_target_execution_inputs( @pytest.fixture(scope="session") async def read_control_connector_runner( + request: SubRequest, dagger_client: dagger.Client, read_control_execution_inputs: ExecutionInputs, connection_id: str, @@ -785,6 +798,7 @@ async def read_control_connector_runner( yield ConnectorRunner( dagger_client, read_control_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) await proxy.clear_cache_volume() @@ -806,6 +820,7 @@ async def read_control_execution_result( @pytest.fixture(scope="session") async def read_target_connector_runner( + request: SubRequest, dagger_client: dagger.Client, read_target_execution_inputs: ExecutionInputs, read_control_execution_result: ExecutionResult, @@ -821,6 +836,7 @@ async def read_target_connector_runner( yield ConnectorRunner( dagger_client, read_target_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) await proxy.clear_cache_volume() @@ -890,6 +906,7 @@ def read_with_state_target_execution_inputs( @pytest.fixture(scope="session") async def read_with_state_control_connector_runner( + request: SubRequest, dagger_client: dagger.Client, read_with_state_control_execution_inputs: ExecutionInputs, connection_id: str, @@ -899,6 +916,7 @@ async def read_with_state_control_connector_runner( yield ConnectorRunner( dagger_client, read_with_state_control_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) await proxy.clear_cache_volume() @@ -922,6 +940,7 @@ async def read_with_state_control_execution_result( @pytest.fixture(scope="session") async def read_with_state_target_connector_runner( + request: SubRequest, dagger_client: dagger.Client, read_with_state_target_execution_inputs: ExecutionInputs, read_with_state_control_execution_result: ExecutionResult, @@ -936,6 +955,7 @@ async def read_with_state_target_connector_runner( yield ConnectorRunner( dagger_client, read_with_state_target_execution_inputs, + request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI], http_proxy=proxy, ) await proxy.clear_cache_volume() diff --git a/airbyte-ci/connectors/live-tests/src/live_tests/validation_tests/test_read.py b/airbyte-ci/connectors/live-tests/src/live_tests/validation_tests/test_read.py index 747dd301cd9..c16070293cc 100644 --- a/airbyte-ci/connectors/live-tests/src/live_tests/validation_tests/test_read.py +++ b/airbyte-ci/connectors/live-tests/src/live_tests/validation_tests/test_read.py @@ -58,7 +58,7 @@ async def test_read( for record in records: has_records = True - if not conforms_to_schema(record.record.data, stream.schema()): + if not conforms_to_schema(read_target_execution_result.get_obfuscated_types(record.record.data), stream.schema()): errors.append(f"A record was encountered that does not conform to the schema. stream={stream.stream.name} record={record}") if primary_key: if _extract_primary_key_value(record.dict(), primary_key) is None: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py index f4a086d1781..ca8ba508c27 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py @@ -649,7 +649,10 @@ class LiveTests(Step): exit_code, stdout, stderr = await get_exec_result(container) - if "report.html" not in await container.directory(f"{tests_artifacts_dir}/session_{self.run_id}").entries(): + if ( + f"session_{self.run_id}" not in await container.directory(f"{tests_artifacts_dir}").entries() + or "report.html" not in await container.directory(f"{tests_artifacts_dir}/session_{self.run_id}").entries() + ): main_logger.exception( "The report file was not generated, an unhandled error likely happened during regression test execution, please check the step stderr and stdout for more details" ) @@ -686,6 +689,10 @@ class LiveTests(Step): # Enable dagger-in-dagger .with_unix_socket("/var/run/docker.sock", self.dagger_client.host().unix_socket("/var/run/docker.sock")) .with_env_variable("RUN_IN_AIRBYTE_CI", "1") + .with_file( + "/tmp/record_obfuscator.py", + self.context.get_repo_dir("tools/bin", include=["record_obfuscator.py"]).file("record_obfuscator.py"), + ) # The connector being tested is already built and is stored in a location accessible to an inner dagger kicked off by # regression tests. The connector can be found if you know the container ID, so we write the container ID to a file and put # it in the regression test container. This way regression tests will use the already-built connector instead of trying to diff --git a/tools/bin/record_obfuscator.py b/tools/bin/record_obfuscator.py new file mode 100755 index 00000000000..d3ef832c3de --- /dev/null +++ b/tools/bin/record_obfuscator.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +import hashlib +import json +import sys +from typing import Any + + +def _generate_hash(value: Any) -> str: + return hashlib.sha256(str(value).encode()).hexdigest() + + +def obfuscate(value: Any) -> Any: + if isinstance(value, str): + obfuscated_value = f"string_len-{len(value)}_" + _generate_hash(value) + elif isinstance(value, int): + obfuscated_value = f"integer_len-{len(str(value))}" + _generate_hash(value) + elif isinstance(value, float): + obfuscated_value = f"number_len-{len(str(value))}" + _generate_hash(value) + elif isinstance(value, bool): + obfuscated_value = "boolean_" + _generate_hash(value) + elif value is None: + obfuscated_value = "null_" + _generate_hash(value) + elif isinstance(value, list): + obfuscated_value = f"array_len-{len(value)}" + _generate_hash(json.dumps(value, sort_keys=True).encode()) + elif isinstance(value, dict): + obfuscated_value = f"object_len-{len(value.keys())}" + _generate_hash(json.dumps(value, sort_keys=True).encode()) + else: + raise ValueError(f"Unsupported data type: {type(value)}") + + return obfuscated_value + + +if __name__ == "__main__": + for line in sys.stdin: + line = line.strip() + try: + data = json.loads(line) + except Exception as exc: + # We don't expect invalid json so if we see it, it will go to stderr + sys.stderr.write(f"{line}\n") + else: + if data.get("type") == "RECORD": + record_data = data["record"].get("data", {}) + obfuscated_record = {k: obfuscate(v) for k, v in record_data.items()} + data["record"]["data"] = obfuscated_record + sys.stdout.write(f"{json.dumps(data)}\n")