Create record obfuscator and use it in live tests (#43318)
This commit is contained in:
@@ -6,6 +6,8 @@ from __future__ import annotations
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
@@ -23,11 +25,13 @@ class ConnectorRunner:
|
||||
IN_CONTAINER_CONFIGURED_CATALOG_PATH = "/data/catalog.json"
|
||||
IN_CONTAINER_STATE_PATH = "/data/state.json"
|
||||
IN_CONTAINER_OUTPUT_PATH = "/output.txt"
|
||||
IN_CONTAINER_OBFUSCATOR_PATH = "/user/local/bin/record_obfuscator.py"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
dagger_client: dagger.Client,
|
||||
execution_inputs: ExecutionInputs,
|
||||
is_airbyte_ci: bool,
|
||||
http_proxy: Optional[Proxy] = None,
|
||||
):
|
||||
self.connector_under_test = execution_inputs.connector_under_test
|
||||
@@ -45,6 +49,11 @@ class ConnectorRunner:
|
||||
self.http_proxy = http_proxy
|
||||
self.logger = logging.getLogger(f"{self.connector_under_test.name}-{self.connector_under_test.version}")
|
||||
self.dagger_client = dagger_client.pipeline(f"{self.connector_under_test.name}-{self.connector_under_test.version}")
|
||||
if is_airbyte_ci:
|
||||
self.host_obfuscator_path = "/tmp/record_obfuscator.py"
|
||||
else:
|
||||
repo_root = Path(subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).strip().decode())
|
||||
self.host_obfuscator_path = f"{repo_root}/tools/bin/record_obfuscator.py"
|
||||
|
||||
@property
|
||||
def _connector_under_test_container(self) -> dagger.Container:
|
||||
@@ -109,6 +118,12 @@ class ConnectorRunner:
|
||||
container = self._connector_under_test_container
|
||||
# Do not cache downstream dagger layers
|
||||
container = container.with_env_variable("CACHEBUSTER", str(uuid.uuid4()))
|
||||
expanded_host_executable_path = os.path.expanduser(self.host_obfuscator_path)
|
||||
|
||||
container = container.with_file(
|
||||
self.IN_CONTAINER_OBFUSCATOR_PATH,
|
||||
self.dagger_client.host().file(expanded_host_executable_path),
|
||||
)
|
||||
for env_var_name, env_var_value in self.environment_variables.items():
|
||||
container = container.with_env_variable(env_var_name, env_var_value)
|
||||
if self.config:
|
||||
@@ -134,7 +149,8 @@ class ConnectorRunner:
|
||||
[
|
||||
"sh",
|
||||
"-c",
|
||||
" ".join(airbyte_command) + f" > {self.IN_CONTAINER_OUTPUT_PATH} 2>&1 | tee -a {self.IN_CONTAINER_OUTPUT_PATH}",
|
||||
" ".join(airbyte_command)
|
||||
+ f"| {self.IN_CONTAINER_OBFUSCATOR_PATH} > {self.IN_CONTAINER_OUTPUT_PATH} 2>&1 | tee -a {self.IN_CONTAINER_OUTPUT_PATH}",
|
||||
],
|
||||
skip_entrypoint=True,
|
||||
)
|
||||
|
||||
@@ -321,10 +321,36 @@ class ExecutionResult:
|
||||
stream_schema_builder = SchemaBuilder()
|
||||
stream_schema_builder.add_schema({"type": "object", "properties": {}})
|
||||
stream_builders[stream] = stream_schema_builder
|
||||
stream_builders[stream].add_object(record.record.data)
|
||||
stream_builders[stream].add_object(self.get_obfuscated_types(record.record.data))
|
||||
self.logger.info("Stream schemas generated")
|
||||
return {stream: sort_dict_keys(stream_builders[stream].to_schema()) for stream in stream_builders}
|
||||
|
||||
@staticmethod
|
||||
def get_obfuscated_types(data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Convert obfuscated records into a record whose values have the same type as the original values.
|
||||
"""
|
||||
types = {}
|
||||
for k, v in data.items():
|
||||
if v.startswith("string_"):
|
||||
types[k] = "a"
|
||||
elif v.startswith("integer_"):
|
||||
types[k] = 0
|
||||
elif v.startswith("number_"):
|
||||
types[k] = 0.1
|
||||
elif v.startswith("boolean_"):
|
||||
types[k] = True
|
||||
elif v.startswith("null_"):
|
||||
types[k] = None
|
||||
elif v.startswith("array_"):
|
||||
types[k] = []
|
||||
elif v.startswith("object_"):
|
||||
types[k] = {}
|
||||
else:
|
||||
types[k] = v
|
||||
|
||||
return types
|
||||
|
||||
def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]:
|
||||
assert self.backend is not None, "Backend must be set to get records per stream"
|
||||
self.logger.info(f"Reading records for stream {stream}")
|
||||
|
||||
@@ -467,12 +467,14 @@ def spec_control_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def spec_control_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
spec_control_execution_inputs: ExecutionInputs,
|
||||
) -> ConnectorRunner:
|
||||
runner = ConnectorRunner(
|
||||
dagger_client,
|
||||
spec_control_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
)
|
||||
return runner
|
||||
|
||||
@@ -507,12 +509,14 @@ def spec_target_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def spec_target_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
spec_target_execution_inputs: ExecutionInputs,
|
||||
) -> ConnectorRunner:
|
||||
runner = ConnectorRunner(
|
||||
dagger_client,
|
||||
spec_target_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
)
|
||||
return runner
|
||||
|
||||
@@ -551,6 +555,7 @@ def check_control_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def check_control_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
check_control_execution_inputs: ExecutionInputs,
|
||||
connection_id: str,
|
||||
@@ -560,6 +565,7 @@ async def check_control_connector_runner(
|
||||
runner = ConnectorRunner(
|
||||
dagger_client,
|
||||
check_control_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
yield runner
|
||||
@@ -600,6 +606,7 @@ def check_target_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def check_target_connector_runner(
|
||||
request: SubRequest,
|
||||
check_control_execution_result: ExecutionResult,
|
||||
dagger_client: dagger.Client,
|
||||
check_target_execution_inputs: ExecutionInputs,
|
||||
@@ -614,6 +621,7 @@ async def check_target_connector_runner(
|
||||
runner = ConnectorRunner(
|
||||
dagger_client,
|
||||
check_target_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
yield runner
|
||||
@@ -685,6 +693,7 @@ def discover_target_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def discover_control_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
discover_control_execution_inputs: ExecutionInputs,
|
||||
connection_id: str,
|
||||
@@ -694,6 +703,7 @@ async def discover_control_connector_runner(
|
||||
yield ConnectorRunner(
|
||||
dagger_client,
|
||||
discover_control_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
await proxy.clear_cache_volume()
|
||||
@@ -701,6 +711,7 @@ async def discover_control_connector_runner(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def discover_target_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
discover_control_execution_result: ExecutionResult,
|
||||
discover_target_execution_inputs: ExecutionInputs,
|
||||
@@ -716,6 +727,7 @@ async def discover_target_connector_runner(
|
||||
yield ConnectorRunner(
|
||||
dagger_client,
|
||||
discover_target_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
await proxy.clear_cache_volume()
|
||||
@@ -776,6 +788,7 @@ def read_target_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def read_control_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
read_control_execution_inputs: ExecutionInputs,
|
||||
connection_id: str,
|
||||
@@ -785,6 +798,7 @@ async def read_control_connector_runner(
|
||||
yield ConnectorRunner(
|
||||
dagger_client,
|
||||
read_control_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
await proxy.clear_cache_volume()
|
||||
@@ -806,6 +820,7 @@ async def read_control_execution_result(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def read_target_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
read_target_execution_inputs: ExecutionInputs,
|
||||
read_control_execution_result: ExecutionResult,
|
||||
@@ -821,6 +836,7 @@ async def read_target_connector_runner(
|
||||
yield ConnectorRunner(
|
||||
dagger_client,
|
||||
read_target_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
await proxy.clear_cache_volume()
|
||||
@@ -890,6 +906,7 @@ def read_with_state_target_execution_inputs(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def read_with_state_control_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
read_with_state_control_execution_inputs: ExecutionInputs,
|
||||
connection_id: str,
|
||||
@@ -899,6 +916,7 @@ async def read_with_state_control_connector_runner(
|
||||
yield ConnectorRunner(
|
||||
dagger_client,
|
||||
read_with_state_control_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
await proxy.clear_cache_volume()
|
||||
@@ -922,6 +940,7 @@ async def read_with_state_control_execution_result(
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def read_with_state_target_connector_runner(
|
||||
request: SubRequest,
|
||||
dagger_client: dagger.Client,
|
||||
read_with_state_target_execution_inputs: ExecutionInputs,
|
||||
read_with_state_control_execution_result: ExecutionResult,
|
||||
@@ -936,6 +955,7 @@ async def read_with_state_target_connector_runner(
|
||||
yield ConnectorRunner(
|
||||
dagger_client,
|
||||
read_with_state_target_execution_inputs,
|
||||
request.config.stash[stash_keys.RUN_IN_AIRBYTE_CI],
|
||||
http_proxy=proxy,
|
||||
)
|
||||
await proxy.clear_cache_volume()
|
||||
|
||||
@@ -58,7 +58,7 @@ async def test_read(
|
||||
|
||||
for record in records:
|
||||
has_records = True
|
||||
if not conforms_to_schema(record.record.data, stream.schema()):
|
||||
if not conforms_to_schema(read_target_execution_result.get_obfuscated_types(record.record.data), stream.schema()):
|
||||
errors.append(f"A record was encountered that does not conform to the schema. stream={stream.stream.name} record={record}")
|
||||
if primary_key:
|
||||
if _extract_primary_key_value(record.dict(), primary_key) is None:
|
||||
|
||||
@@ -649,7 +649,10 @@ class LiveTests(Step):
|
||||
|
||||
exit_code, stdout, stderr = await get_exec_result(container)
|
||||
|
||||
if "report.html" not in await container.directory(f"{tests_artifacts_dir}/session_{self.run_id}").entries():
|
||||
if (
|
||||
f"session_{self.run_id}" not in await container.directory(f"{tests_artifacts_dir}").entries()
|
||||
or "report.html" not in await container.directory(f"{tests_artifacts_dir}/session_{self.run_id}").entries()
|
||||
):
|
||||
main_logger.exception(
|
||||
"The report file was not generated, an unhandled error likely happened during regression test execution, please check the step stderr and stdout for more details"
|
||||
)
|
||||
@@ -686,6 +689,10 @@ class LiveTests(Step):
|
||||
# Enable dagger-in-dagger
|
||||
.with_unix_socket("/var/run/docker.sock", self.dagger_client.host().unix_socket("/var/run/docker.sock"))
|
||||
.with_env_variable("RUN_IN_AIRBYTE_CI", "1")
|
||||
.with_file(
|
||||
"/tmp/record_obfuscator.py",
|
||||
self.context.get_repo_dir("tools/bin", include=["record_obfuscator.py"]).file("record_obfuscator.py"),
|
||||
)
|
||||
# The connector being tested is already built and is stored in a location accessible to an inner dagger kicked off by
|
||||
# regression tests. The connector can be found if you know the container ID, so we write the container ID to a file and put
|
||||
# it in the regression test container. This way regression tests will use the already-built connector instead of trying to
|
||||
|
||||
49
tools/bin/record_obfuscator.py
Executable file
49
tools/bin/record_obfuscator.py
Executable file
@@ -0,0 +1,49 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _generate_hash(value: Any) -> str:
|
||||
return hashlib.sha256(str(value).encode()).hexdigest()
|
||||
|
||||
|
||||
def obfuscate(value: Any) -> Any:
|
||||
if isinstance(value, str):
|
||||
obfuscated_value = f"string_len-{len(value)}_" + _generate_hash(value)
|
||||
elif isinstance(value, int):
|
||||
obfuscated_value = f"integer_len-{len(str(value))}" + _generate_hash(value)
|
||||
elif isinstance(value, float):
|
||||
obfuscated_value = f"number_len-{len(str(value))}" + _generate_hash(value)
|
||||
elif isinstance(value, bool):
|
||||
obfuscated_value = "boolean_" + _generate_hash(value)
|
||||
elif value is None:
|
||||
obfuscated_value = "null_" + _generate_hash(value)
|
||||
elif isinstance(value, list):
|
||||
obfuscated_value = f"array_len-{len(value)}" + _generate_hash(json.dumps(value, sort_keys=True).encode())
|
||||
elif isinstance(value, dict):
|
||||
obfuscated_value = f"object_len-{len(value.keys())}" + _generate_hash(json.dumps(value, sort_keys=True).encode())
|
||||
else:
|
||||
raise ValueError(f"Unsupported data type: {type(value)}")
|
||||
|
||||
return obfuscated_value
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for line in sys.stdin:
|
||||
line = line.strip()
|
||||
try:
|
||||
data = json.loads(line)
|
||||
except Exception as exc:
|
||||
# We don't expect invalid json so if we see it, it will go to stderr
|
||||
sys.stderr.write(f"{line}\n")
|
||||
else:
|
||||
if data.get("type") == "RECORD":
|
||||
record_data = data["record"].get("data", {})
|
||||
obfuscated_record = {k: obfuscate(v) for k, v in record_data.items()}
|
||||
data["record"]["data"] = obfuscated_record
|
||||
sys.stdout.write(f"{json.dumps(data)}\n")
|
||||
Reference in New Issue
Block a user