1
0
mirror of synced 2025-12-20 02:23:30 -05:00
Files
airbyte/tools/git_hooks/spec_linter.py
2023-08-25 10:20:41 -07:00

138 lines
4.2 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
"""
This script is responsible for connectors spec.json file validation.
Input:
List of spec files
Output:
exit code 0 - check is success
exit code 1 - check failed for at least one spec file
How spec file validation works:
1. read spec file and serialize it as python dict object
2. get properties field from spec object
3. check if all fields from FIELDS_TO_CHECK exist in each property
4. if field has oneOf attribute - fetch all subobjects and for each of them goto step (2)
"""
import json
import logging
import sys
from typing import Any, List, Mapping, Optional, Tuple
# required fields for each property field in spec
FIELDS_TO_CHECK = {"title", "description"}
# configure logging
logging.basicConfig(format="%(message)s")
def read_spec_file(spec_path: str) -> bool:
"""
Parses spec file and applies validation rules.
Returns True if spec is valid else False
"""
errors: List[Tuple[str, Optional[str]]] = []
with open(spec_path) as json_file:
try:
root_schema = json.load(json_file)["connectionSpecification"]["properties"]
except (KeyError, TypeError):
errors.append(("Couldn't find properties in connector spec.json", None))
except json.JSONDecodeError:
errors.append(("Couldn't parse json file", None))
else:
errors.extend(validate_schema(spec_path, root_schema))
for err_msg, err_field in errors:
print_error(spec_path, err_msg, err_field)
return False if errors else True
def print_error(spec_path: str, error_message: str, failed_field: Optional[str] = None) -> None:
"""
Logs error in following format: <BOLD>SPEC PATH</BOLD> ERROR MSG <RED>FIELD NAME</RED>
"""
error = f"\033[1m{spec_path}\033[0m: {error_message}"
if failed_field:
error += f" \x1b[31;1m{failed_field}\033[0m"
logging.error(error)
def validate_schema(
spec_path: str,
schema: Mapping[str, Any],
parent_fields: Optional[List[str]] = None,
) -> List[Tuple[str, str]]:
"""
Validates given spec dictionary object. Returns list of errors
"""
errors: List[Tuple[str, str]] = []
parent_fields = parent_fields if parent_fields else []
for field_name, field_schema in schema.items():
field_errors = validate_field(field_name, field_schema, parent_fields)
errors.extend(field_errors)
if field_errors:
continue
for index, oneof_schema in enumerate(fetch_oneof_schemas(field_schema)):
errors.extend(
validate_schema(
spec_path,
oneof_schema["properties"],
parent_fields + [field_name, str(index)],
)
)
return errors
def fetch_oneof_schemas(schema: Mapping[str, Any]) -> List[Mapping[str, Any]]:
"""
Finds subschemas in oneOf field
"""
return [spec for spec in schema.get("oneOf", []) if spec.get("properties")]
def validate_field(
field_name: str,
schema: Mapping[str, Any],
parent_fields: Optional[List[str]] = None,
) -> List[Tuple[str, str]]:
"""
Validates single field objects and return errors if they exist
"""
if "const" in schema.keys():
# Field with "const" value is metainfo and not expected to contain title
# and description.
return []
errors: List[Tuple[str, str]] = []
full_field_name = get_full_field_name(field_name, parent_fields)
if not FIELDS_TO_CHECK.issubset(schema.keys()):
errors.append(("Check failed for field", full_field_name))
if schema.get("oneOf") and (schema["type"] != "object" or not isinstance(schema["oneOf"], list)):
errors.append(("Incorrect oneOf schema in field", full_field_name))
return errors
def get_full_field_name(field_name: str, parent_fields: Optional[List[str]] = None) -> str:
"""
Returns full path to a field.
e.g. root.middle.child, root.oneof.1.attr
"""
return ".".join(parent_fields + [field_name]) if parent_fields else field_name
if __name__ == "__main__":
spec_files = sys.argv[1:]
if not all([read_spec_file(file_path) for file_path in spec_files]):
exit(1)