1
0
mirror of synced 2025-12-19 18:14:56 -05:00

feat(metadata_service): add new generate-connector-registry and generate-specs-secrets-mask operations (#64572)

This commit is contained in:
Patrick Nilan
2025-08-13 21:32:09 -07:00
committed by GitHub
parent f361d660c2
commit a7930880ca
15 changed files with 1638 additions and 29 deletions

View File

@@ -0,0 +1,143 @@
name: Generate Connector Registries
on:
workflow_dispatch:
inputs:
gitref:
description: "The git ref to check out from the repository"
required: false
type: string
# TODO: uncomment once ready to implement schedule
# schedule:
# - cron: '*/5 * * * *' # Run every 5 minutes
permissions:
contents: read
jobs:
generate-cloud-registry:
name: Generate Cloud Registry
runs-on: ubuntu-24.04
steps:
- name: Checkout Airbyte
uses: actions/checkout@8edcb1bdb4e267140fa742c62e395cd74f332709 # v4.2.2
with:
ref: ${{ github.event.inputs.gitref || github.ref }}
- name: Set up Python
uses: actions/setup-python@9322b3ca74000aeb2c01eb777b646334015ddd72 # v5.6.0
with:
python-version: "3.11"
check-latest: true
update-environment: true
- name: Install Poetry
uses: snok/install-poetry@d526ede1e27960b7b181a5ac53044f552afdaa38 # v1.4.1
with:
version: latest
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install metadata_service
working-directory: airbyte-ci/connectors/metadata_service/lib
run: poetry install
- name: Generate Cloud Registry
env:
GCS_CREDENTIALS: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
GCS_DEV_CREDENTIALS: ${{ secrets.METADATA_SERVICE_DEV_GCS_CREDENTIALS }}
SLACK_TOKEN: ${{ secrets.SLACK_BOT_TOKEN_AIRBYTE_TEAM }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# SENTRY_DSN: ${{ secrets.SENTRY_DSN_METADATA_SERVICE }}
# SENTRY_TRACES_SAMPLE_RATE: 1.0
# SENTRY_ENVIRONMENT: production
working-directory: airbyte-ci/connectors/metadata_service/lib
shell: bash
run: |
poetry run metadata_service generate-connector-registry prod-airbyte-cloud-connector-metadata-service cloud
generate-oss-registry:
name: Generate OSS Registry
runs-on: ubuntu-24.04
steps:
- name: Checkout Airbyte
uses: actions/checkout@8edcb1bdb4e267140fa742c62e395cd74f332709 # v4.2.2
with:
ref: ${{ github.event.inputs.gitref || github.ref }}
- name: Set up Python
uses: actions/setup-python@9322b3ca74000aeb2c01eb777b646334015ddd72 # v5.6.0
with:
python-version: "3.11"
check-latest: true
update-environment: true
- name: Install Poetry
uses: snok/install-poetry@d526ede1e27960b7b181a5ac53044f552afdaa38 # v1.4.1
with:
version: latest
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install metadata_service
working-directory: airbyte-ci/connectors/metadata_service/lib
run: poetry install
- name: Generate OSS Registry
env:
GCS_CREDENTIALS: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
GCS_DEV_CREDENTIALS: ${{ secrets.METADATA_SERVICE_DEV_GCS_CREDENTIALS }}
SLACK_TOKEN: ${{ secrets.SLACK_BOT_TOKEN_AIRBYTE_TEAM }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# SENTRY_DSN: ${{ secrets.SENTRY_DSN_METADATA_SERVICE }}
# SENTRY_TRACES_SAMPLE_RATE: 1.0
# SENTRY_ENVIRONMENT: production
working-directory: airbyte-ci/connectors/metadata_service/lib
shell: bash
run: |
poetry run metadata_service generate-connector-registry prod-airbyte-cloud-connector-metadata-service oss
generate-specs-secrets-mask:
name: Generate Specs Secrets Mask
runs-on: ubuntu-24.04
needs: [generate-cloud-registry, generate-oss-registry]
steps:
- name: Checkout Airbyte
uses: actions/checkout@8edcb1bdb4e267140fa742c62e395cd74f332709 # v4.2.2
with:
ref: ${{ github.event.inputs.gitref || github.ref }}
- name: Set up Python
uses: actions/setup-python@9322b3ca74000aeb2c01eb777b646334015ddd72 # v5.6.0
with:
python-version: "3.11"
check-latest: true
update-environment: true
- name: Install Poetry
uses: snok/install-poetry@d526ede1e27960b7b181a5ac53044f552afdaa38 # v1.4.1
with:
version: latest
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install metadata_service
working-directory: airbyte-ci/connectors/metadata_service/lib
run: poetry install
- name: Generate Specs Secrets Mask
env:
GCS_CREDENTIALS: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
GCS_DEV_CREDENTIALS: ${{ secrets.METADATA_SERVICE_DEV_GCS_CREDENTIALS }}
SLACK_TOKEN: ${{ secrets.SLACK_BOT_TOKEN_AIRBYTE_TEAM }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# SENTRY_DSN: ${{ secrets.SENTRY_DSN_METADATA_SERVICE }}
# SENTRY_TRACES_SAMPLE_RATE: 1.0
# SENTRY_ENVIRONMENT: production
working-directory: airbyte-ci/connectors/metadata_service/lib
shell: bash
run: |
poetry run metadata_service generate-specs-secrets-mask prod-airbyte-cloud-connector-metadata-service

View File

@@ -6,9 +6,10 @@ import logging
import pathlib
import click
import sentry_sdk
from pydantic import ValidationError
from metadata_service.constants import METADATA_FILE_NAME
from metadata_service.constants import METADATA_FILE_NAME, VALID_REGISTRIES
from metadata_service.gcs_upload import (
MetadataDeleteInfo,
MetadataUploadInfo,
@@ -16,6 +17,9 @@ from metadata_service.gcs_upload import (
promote_release_candidate_in_gcs,
upload_metadata_to_gcs,
)
from metadata_service.registry import generate_and_persist_connector_registry
from metadata_service.sentry import setup_sentry
from metadata_service.specs_secrets_mask import generate_and_persist_specs_secrets_mask
from metadata_service.stale_metadata_report import generate_and_publish_stale_metadata_report
from metadata_service.validators.metadata_validator import PRE_UPLOAD_VALIDATORS, ValidatorOptions, validate_and_load
@@ -29,9 +33,10 @@ def setup_logging(debug: bool = False):
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler()],
)
# Suppress logging from urllib3 and slack_sdk
# Suppress logging from the following libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("slack_sdk.web.base_client").setLevel(logging.WARNING)
logging.getLogger("google.resumable_media").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@@ -65,6 +70,7 @@ def log_metadata_deletion_info(metadata_deletion_info: MetadataDeleteInfo):
@click.option("--debug", is_flag=True, help="Enable debug logging", default=False)
def metadata_service(debug: bool):
"""Top-level command group with logging configuration."""
setup_sentry()
setup_logging(debug)
@@ -152,3 +158,45 @@ def promote_release_candidate(connector_docker_repository: str, connector_versio
except (FileNotFoundError, ValueError) as e:
click.secho(f"The release candidate could not be promoted: {str(e)}", fg="red")
exit(1)
@metadata_service.command(help="Generate the cloud registry and persist it to GCS.")
@click.argument("bucket-name", type=click.STRING, required=True)
@click.argument("registry-type", type=click.Choice(VALID_REGISTRIES), required=True)
@sentry_sdk.trace
def generate_connector_registry(bucket_name: str, registry_type: str):
# Set Sentry context for the generate_registry command
sentry_sdk.set_tag("command", "generate_registry")
sentry_sdk.set_tag("bucket_name", bucket_name)
sentry_sdk.set_tag("registry_type", registry_type)
logger.info(f"Starting {registry_type} registry generation and upload process.")
try:
generate_and_persist_connector_registry(bucket_name, registry_type)
logger.info(f"SUCCESS: {registry_type} registry generation and upload process completed successfully.")
sentry_sdk.set_tag("operation_success", True)
except Exception as e:
sentry_sdk.set_tag("operation_success", False)
sentry_sdk.capture_exception(e)
logger.error(f"FATAL ERROR: An error occurred when generating and persisting the {registry_type} registry: {str(e)}")
exit(1)
@metadata_service.command(help="Generate the specs secrets mask and persist it to GCS.")
@click.argument("bucket-name", type=click.STRING, required=True)
@sentry_sdk.trace
def generate_specs_secrets_mask(bucket_name: str):
# Set Sentry context for the generate_specs_secrets_mask command
sentry_sdk.set_tag("command", "generate_specs_secrets_mask")
sentry_sdk.set_tag("bucket_name", bucket_name)
logger.info("Starting specs secrets mask generation and upload process.")
try:
generate_and_persist_specs_secrets_mask(bucket_name)
sentry_sdk.set_tag("operation_success", True)
logger.info("Specs secrets mask generation and upload process completed successfully.")
except Exception as e:
sentry_sdk.set_tag("operation_success", False)
sentry_sdk.capture_exception(e)
logger.error(f"FATAL ERROR: An error occurred when generating and persisting the specs secrets mask: {str(e)}")
exit(1)

View File

@@ -17,6 +17,12 @@ COMPONENTS_ZIP_SHA256_FILE_NAME = "components.zip.sha256"
LATEST_GCS_FOLDER_NAME = "latest"
RELEASE_CANDIDATE_GCS_FOLDER_NAME = "release_candidate"
VALID_REGISTRIES = ["oss", "cloud"]
REGISTRIES_FOLDER = "registries/v0"
ANALYTICS_BUCKET = "ab-analytics-connector-metrics"
ANALYTICS_FOLDER = "data/connector_quality_metrics"
PUBLIC_GCS_BASE_URL = "https://storage.googleapis.com/"
GITHUB_REPO_NAME = "airbytehq/airbyte"
EXTENSIBILITY_TEAM_SLACK_TEAM_ID = "S08SQDL2RS9" # @oc-extensibility-critical-systems
STALE_REPORT_CHANNEL = "C05507UP11A" # #dev-connectors-extensibility-alerts
@@ -28,3 +34,5 @@ PUBLISH_UPDATE_CHANNEL = "C056HGD1QSW" # #connector-publish-updates
# A shorter grace period could lead to false positives in stale metadata detection.
PUBLISH_GRACE_PERIOD = datetime.timedelta(hours=6)
SLACK_NOTIFICATIONS_ENABLED = "true"
SPECS_SECRETS_MASK_FILE_NAME = "specs_secrets_mask.yaml"

View File

@@ -2,9 +2,9 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import base64
import json
import os
from typing import Optional
from google.cloud import storage
from google.oauth2 import service_account
@@ -19,3 +19,18 @@ def get_gcs_storage_client() -> storage.Client:
service_account_info = json.loads(gcs_creds)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
return storage.Client(credentials=credentials)
def safe_read_gcs_file(gcs_blob: storage.Blob) -> Optional[str]:
"""Read the connector metrics jsonl blob.
Args:
gcs_blob (storage.Blob): The blob.
Returns:
dict: The metrics.
"""
if not gcs_blob.exists():
return None
return gcs_blob.download_as_string().decode("utf-8")

View File

@@ -0,0 +1,39 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from enum import EnumMeta
class CaseInsensitiveKeys(EnumMeta):
"""A metaclass for creating enums with case-insensitive keys."""
def __getitem__(cls, item):
try:
return super().__getitem__(item)
except Exception:
for key in cls._member_map_:
if key.casefold() == item.casefold():
return super().__getitem__(key)
def default_none_to_dict(value, key, obj):
"""Set the value of a key in a dictionary to an empty dictionary if the value is None.
Useful with pydash's set_with function.
e.g. set_with(obj, key, value, default_none_to_dict)
For more information, see https://github.com/dgilland/pydash/issues/122
Args:
value: The value to check.
key: The key to set in the dictionary.
obj: The dictionary to set the key in.
"""
if obj is None:
return
if value is None:
obj[key] = {}

View File

@@ -0,0 +1,371 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import copy
import json
import logging
import os
import urllib.parse
from collections import defaultdict
from enum import Enum
from typing import Optional, Union
import semver
import sentry_sdk
from google.cloud import storage
from google.oauth2 import service_account
from pydash.objects import set_with
from metadata_service.constants import (
ANALYTICS_BUCKET,
ANALYTICS_FOLDER,
METADATA_FOLDER,
PUBLIC_GCS_BASE_URL,
PUBLISH_UPDATE_CHANNEL,
REGISTRIES_FOLDER,
VALID_REGISTRIES,
)
from metadata_service.helpers.gcs import get_gcs_storage_client, safe_read_gcs_file
from metadata_service.helpers.object_helpers import CaseInsensitiveKeys, default_none_to_dict
from metadata_service.helpers.slack import send_slack_message
from metadata_service.models.generated import ConnectorRegistryDestinationDefinition, ConnectorRegistrySourceDefinition, ConnectorRegistryV0
from metadata_service.models.transform import to_json_sanitized_dict
logger = logging.getLogger(__name__)
PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]
class ConnectorTypes(str, Enum, metaclass=CaseInsensitiveKeys):
SOURCE = "source"
DESTINATION = "destination"
class ConnectorTypePrimaryKey(str, Enum, metaclass=CaseInsensitiveKeys):
SOURCE = "sourceDefinitionId"
DESTINATION = "destinationDefinitionId"
class StringNullJsonDecoder(json.JSONDecoder):
"""A JSON decoder that converts "null" strings to None."""
def __init__(self, *args, **kwargs):
super().__init__(object_hook=self.object_hook, *args, **kwargs)
def object_hook(self, obj):
return {k: (None if v == "null" else v) for k, v in obj.items()}
def _apply_metrics_to_registry_entry(registry_entry_dict: dict, connector_type: ConnectorTypes, latest_metrics_dict: dict) -> dict:
"""Apply the metrics to the registry entry.
Args:
registry_entry_dict (dict): The registry entry.
latest_metrics_dict (dict): The metrics.
Returns:
dict: The registry entry with metrics.
"""
connector_id = registry_entry_dict[ConnectorTypePrimaryKey[connector_type.value]]
metrics = latest_metrics_dict.get(connector_id, {})
# Safely add metrics to ["generated"]["metrics"], knowing that the key may not exist, or might be None
registry_entry_dict = set_with(registry_entry_dict, "generated.metrics", metrics, default_none_to_dict)
return registry_entry_dict
def _apply_release_candidate_entries(registry_entry_dict: dict, docker_repository_to_rc_registry_entry: dict) -> dict:
"""Apply the optionally existing release candidate entries to the registry entry.
We need both the release candidate metadata entry and the release candidate registry entry because the metadata entry contains the rollout configuration, and the registry entry contains the actual RC registry entry.
Args:
registry_entry_dict (dict): The registry entry.
docker_repository_to_rc_registry_entry (dict): Mapping of docker repository to release candidate registry entry.
Returns:
dict: The registry entry with release candidates applied.
"""
registry_entry_dict = copy.deepcopy(registry_entry_dict)
if registry_entry_dict["dockerRepository"] in docker_repository_to_rc_registry_entry:
release_candidate_registry_entry = docker_repository_to_rc_registry_entry[registry_entry_dict["dockerRepository"]]
registry_entry_dict = _apply_release_candidates(registry_entry_dict, release_candidate_registry_entry)
return registry_entry_dict
def _apply_release_candidates(
latest_registry_entry: dict,
release_candidate_registry_entry: PolymorphicRegistryEntry,
) -> dict:
"""Apply the release candidate entries to the registry entry.
Args:
latest_registry_entry (dict): The latest registry entry.
release_candidate_registry_entry (PolymorphicRegistryEntry): The release candidate registry entry.
Returns:
dict: The registry entry with release candidates applied.
"""
try:
if not release_candidate_registry_entry.releases.rolloutConfiguration.enableProgressiveRollout:
return latest_registry_entry
# Handle if releases or rolloutConfiguration is not present in the release candidate registry entry
except AttributeError:
return latest_registry_entry
# If the relase candidate is older than the latest registry entry, don't apply the release candidate and return the latest registry entry
if semver.Version.parse(release_candidate_registry_entry.dockerImageTag) < semver.Version.parse(
latest_registry_entry["dockerImageTag"]
):
return latest_registry_entry
updated_registry_entry = copy.deepcopy(latest_registry_entry)
updated_registry_entry.setdefault("releases", {})
updated_registry_entry["releases"]["releaseCandidates"] = {
release_candidate_registry_entry.dockerImageTag: to_json_sanitized_dict(release_candidate_registry_entry)
}
return updated_registry_entry
def _build_connector_registry(
latest_registry_entries: list[PolymorphicRegistryEntry], latest_connector_metrics: dict, docker_repository_to_rc_registry_entry: dict
) -> ConnectorRegistryV0:
registry_dict = {"sources": [], "destinations": []}
for latest_registry_entry in latest_registry_entries:
connector_type = _get_connector_type_from_registry_entry(latest_registry_entry)
plural_connector_type = f"{connector_type.value}s"
registry_entry_dict = to_json_sanitized_dict(latest_registry_entry)
enriched_registry_entry_dict = _apply_metrics_to_registry_entry(registry_entry_dict, connector_type, latest_connector_metrics)
enriched_registry_entry_dict = _apply_release_candidate_entries(
enriched_registry_entry_dict, docker_repository_to_rc_registry_entry
)
registry_dict[plural_connector_type].append(enriched_registry_entry_dict)
return ConnectorRegistryV0.parse_obj(registry_dict)
def _convert_json_to_metrics_dict(jsonl_string: str) -> dict:
"""Convert the jsonl string to a metrics dict.
Args:
jsonl_string (str): The jsonl string.
Returns:
dict: The metrics dict.
"""
metrics_dict = defaultdict(dict)
jsonl_lines = jsonl_string.splitlines()
for line in jsonl_lines:
data = json.loads(line, cls=StringNullJsonDecoder)
connector_data = data["_airbyte_data"]
connector_definition_id = connector_data["connector_definition_id"]
airbyte_platform = connector_data["airbyte_platform"]
metrics_dict[connector_definition_id][airbyte_platform] = connector_data
return metrics_dict
def _get_connector_type_from_registry_entry(registry_entry: PolymorphicRegistryEntry) -> ConnectorTypes:
"""Get the connector type from the registry entry.
Args:
registry_entry (PolymorphicRegistryEntry): The registry entry.
Returns:
ConnectorTypes: The connector type.
"""
if hasattr(registry_entry, ConnectorTypePrimaryKey.SOURCE):
return ConnectorTypes.SOURCE
elif hasattr(registry_entry, ConnectorTypePrimaryKey.DESTINATION):
return ConnectorTypes.DESTINATION
else:
raise ValueError("Registry entry is not a source or destination")
@sentry_sdk.trace
def _get_latest_registry_entries(bucket: storage.Bucket, registry_type: str) -> list[PolymorphicRegistryEntry]:
"""Get the latest registry entries from the GCS bucket.
Args:
bucket (storage.Bucket): The GCS bucket.
registry_type (str): The registry type.
Returns:
list[PolymorphicRegistryEntry]: The latest registry entries.
"""
registry_type_file_name = f"{registry_type}.json"
try:
logger.info(f"Listing blobs in the latest folder: {METADATA_FOLDER}/**/latest/{registry_type_file_name}")
blobs = bucket.list_blobs(match_glob=f"{METADATA_FOLDER}/**/latest/{registry_type_file_name}")
except Exception as e:
logger.error(f"Error listing blobs in the latest folder: {str(e)}")
return []
latest_registry_entries = []
for blob in blobs:
logger.info(f"Reading blob: {blob.name}")
registry_dict = json.loads(safe_read_gcs_file(blob))
try:
if registry_dict.get(ConnectorTypePrimaryKey.SOURCE.value):
registry_model = ConnectorRegistrySourceDefinition.parse_obj(registry_dict)
elif registry_dict.get(ConnectorTypePrimaryKey.DESTINATION.value):
registry_model = ConnectorRegistryDestinationDefinition.parse_obj(registry_dict)
else:
logger.warning(f"Failed to parse registry model for {blob.name}. Skipping.")
continue
except Exception as e:
logger.error(f"Error parsing registry model for {blob.name}: {str(e)}")
continue
latest_registry_entries.append(registry_model)
return latest_registry_entries
@sentry_sdk.trace
def _get_release_candidate_registry_entries(bucket: storage.Bucket, registry_type: str) -> list[PolymorphicRegistryEntry]:
"""Get the release candidate registry entries from the GCS bucket.
Args:
bucket (storage.Bucket): The GCS bucket.
registry_type (str): The registry type.
Returns:
list[PolymorphicRegistryEntry]: The release candidate registry entries.
"""
blobs = bucket.list_blobs(match_glob=f"{METADATA_FOLDER}/**/release_candidate/{registry_type}.json")
release_candidate_registry_entries = []
for blob in blobs:
logger.info(f"Reading blob: {blob.name}")
registry_dict = json.loads(safe_read_gcs_file(blob))
try:
if "/source-" in blob.name:
registry_model = ConnectorRegistrySourceDefinition.parse_obj(registry_dict)
else:
registry_model = ConnectorRegistryDestinationDefinition.parse_obj(registry_dict)
except Exception as e:
logger.error(f"Error parsing registry model for {blob.name}: {str(e)}")
continue
release_candidate_registry_entries.append(registry_model)
return release_candidate_registry_entries
@sentry_sdk.trace
def _get_latest_connector_metrics(bucket: storage.Bucket) -> dict:
"""Get the latest connector metrics from the GCS bucket.
Args:
bucket (storage.Bucket): The GCS bucket.
Returns:
dict: The latest connector metrics.
"""
try:
logger.info(f"Getting blobs in the analytics folder: {ANALYTICS_FOLDER}")
blobs = bucket.list_blobs(prefix=f"{ANALYTICS_FOLDER}/")
except Exception as e:
logger.error(f"Unexpected error listing blobs at {ANALYTICS_FOLDER}: {str(e)}")
return {}
if not blobs:
raise ValueError("No blobs found in the analytics folder")
# Sort blobs by updated time (most recent first)
most_recent_blob = max(blobs, key=lambda blob: blob.updated)
latest_metrics_jsonl = safe_read_gcs_file(most_recent_blob)
if latest_metrics_jsonl is None:
logger.warning(f"No metrics found for {most_recent_blob.name}")
return {}
try:
latest_metrics_dict = _convert_json_to_metrics_dict(latest_metrics_jsonl)
except Exception as e:
logger.error(f"Error converting json to metrics dict: {str(e)}")
return {}
return latest_metrics_dict
@sentry_sdk.trace
def _persist_registry(registry: ConnectorRegistryV0, registry_name: str, bucket: storage.Bucket) -> None:
"""Persist the registry to a json file on GCS bucket
Args:
registry (ConnectorRegistryV0): The registry.
registry_name (str): The name of the registry. One of "cloud" or "oss".
bucket (storage.Bucket): The GCS bucket.
Returns:
None
"""
# TODO: Remove the dev bucket set up once registry artificts have been validated and then add the bucket as a parameter. This block exists so we can write the registry artifacts to the dev bucket for validation.
gcs_creds = os.environ.get("GCS_DEV_CREDENTIALS")
service_account_info = json.loads(gcs_creds)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
client = storage.Client(credentials=credentials)
bucket = client.bucket("dev-airbyte-cloud-connector-metadata-service")
registry_file_name = f"{registry_name}_registry.json"
registry_file_path = f"{REGISTRIES_FOLDER}/{registry_file_name}"
registry_json = registry.json(exclude_none=True)
registry_json = json.dumps(json.loads(registry_json), sort_keys=True)
try:
logger.info(f"Uploading {registry_name} registry to {registry_file_path}")
blob = bucket.blob(registry_file_path)
blob.upload_from_string(registry_json.encode("utf-8"), content_type="application/json")
logger.info(f"Successfully uploaded {registry_name} registry to {registry_file_path}")
return
except Exception as e:
logger.error(f"Error persisting {registry_file_name} to json: {str(e)}")
raise e
def generate_and_persist_connector_registry(bucket_name: str, registry_type: str) -> None:
"""Generate and persist the registry to a json file on GCS bucket.
Args:
bucket_name (str): The name of the GCS bucket.
registry_type (str): The type of the registry.
Returns:
tuple[bool, Optional[str]]: A tuple containing a boolean indicating success and an optional error message.
"""
if registry_type not in VALID_REGISTRIES:
raise ValueError(f"Invalid registry type: {registry_type}. Valid types are: {', '.join(VALID_REGISTRIES)}.")
gcs_client = get_gcs_storage_client()
registry_bucket = gcs_client.bucket(bucket_name)
analytics_bucket = gcs_client.bucket(ANALYTICS_BUCKET)
latest_registry_entries = _get_latest_registry_entries(registry_bucket, registry_type)
release_candidate_registry_entries = _get_release_candidate_registry_entries(registry_bucket, registry_type)
docker_repository_to_rc_registry_entry = {
release_candidate_registry_entries.dockerRepository: release_candidate_registry_entries
for release_candidate_registry_entries in release_candidate_registry_entries
}
latest_connector_metrics = _get_latest_connector_metrics(analytics_bucket)
connector_registry = _build_connector_registry(
latest_registry_entries, latest_connector_metrics, docker_repository_to_rc_registry_entry
)
try:
_persist_registry(connector_registry, registry_type, registry_bucket)
except Exception as e:
message = f"*🤖 🔴 _Registry Generation_ FAILED*:\nFailed to generate and persist {registry_type} registry."
send_slack_message(PUBLISH_UPDATE_CHANNEL, message)
raise e

View File

@@ -0,0 +1,52 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import logging
import os
import sentry_sdk
sentry_logger = logging.getLogger("sentry")
def setup_sentry():
"""
Setup the sentry SDK if SENTRY_DSN is defined for the environment.
Additionally TRACES_SAMPLE_RATE can be set 0-1 otherwise will default to 0.
"""
from sentry_sdk.integrations.argv import ArgvIntegration
from sentry_sdk.integrations.atexit import AtexitIntegration
from sentry_sdk.integrations.dedupe import DedupeIntegration
from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger
from sentry_sdk.integrations.modules import ModulesIntegration
from sentry_sdk.integrations.stdlib import StdlibIntegration
# We ignore the Dagster internal logging to prevent a single error from being logged per node in the job graph
ignore_logger("dagster")
SENTRY_DSN = os.environ.get("SENTRY_DSN")
SENTRY_ENVIRONMENT = os.environ.get("SENTRY_ENVIRONMENT")
TRACES_SAMPLE_RATE = float(os.environ.get("SENTRY_TRACES_SAMPLE_RATE", 0))
sentry_logger.info("Setting up Sentry with")
sentry_logger.info(f"SENTRY_DSN: {SENTRY_DSN}")
sentry_logger.info(f"SENTRY_ENVIRONMENT: {SENTRY_ENVIRONMENT}")
sentry_logger.info(f"SENTRY_TRACES_SAMPLE_RATE: {TRACES_SAMPLE_RATE}")
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=TRACES_SAMPLE_RATE,
environment=SENTRY_ENVIRONMENT,
default_integrations=False,
integrations=[
AtexitIntegration(),
DedupeIntegration(),
StdlibIntegration(),
ModulesIntegration(),
ArgvIntegration(),
LoggingIntegration(),
],
)

View File

@@ -0,0 +1,107 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import json
import logging
import os
from typing import Optional, Set
import dpath.util
import sentry_sdk
import yaml
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.constants import PUBLISH_UPDATE_CHANNEL, REGISTRIES_FOLDER, SPECS_SECRETS_MASK_FILE_NAME, VALID_REGISTRIES
from metadata_service.helpers.gcs import get_gcs_storage_client, safe_read_gcs_file
from metadata_service.helpers.slack import send_slack_message
from metadata_service.models.generated import ConnectorRegistryV0
from metadata_service.models.transform import to_json_sanitized_dict
from metadata_service.registry import PolymorphicRegistryEntry
logger = logging.getLogger(__name__)
@sentry_sdk.trace
def _get_registries_from_gcs(bucket: storage.Bucket) -> list[ConnectorRegistryV0]:
"""Get the registries from GCS and return a list of ConnectorRegistryV0 objects."""
registries = []
for registry in VALID_REGISTRIES:
registry_name = f"{registry}_registry.json"
try:
logger.info(f"Getting registry {registry_name} from GCS")
blob = bucket.blob(f"{REGISTRIES_FOLDER}/{registry_name}")
registry_dict = json.loads(safe_read_gcs_file(blob))
registries.append(ConnectorRegistryV0.parse_obj(registry_dict))
except Exception as e:
logger.error(f"Error getting registry {registry_name} from GCS: {e}")
raise e
return registries
def _get_specs_secrets_from_registry_entries(entries: list[PolymorphicRegistryEntry]) -> Set[str]:
"""Get the specs secrets from the registry entries and return a set of secret properties."""
secret_properties = set()
for entry in entries:
sanitized_entry = to_json_sanitized_dict(entry)
spec_properties = sanitized_entry["spec"]["connectionSpecification"].get("properties")
if spec_properties is None:
continue
for type_path, _ in dpath.util.search(spec_properties, "**/type", yielded=True):
absolute_path = f"/{type_path}"
if "/" in type_path:
property_path, _ = absolute_path.rsplit(sep="/", maxsplit=1)
else:
property_path = absolute_path
property_definition = dpath.util.get(spec_properties, property_path)
marked_as_secret = property_definition.get("airbyte_secret", False)
if marked_as_secret:
secret_properties.add(property_path.split("/")[-1])
return secret_properties
@sentry_sdk.trace
def _persist_secrets_to_gcs(specs_secrets: Set[str], bucket: storage.Bucket) -> None:
"""Persist the specs secrets to GCS."""
# TODO: Remove the dev bucket set up once registry artificts have been validated and then add the bucket as a parameter
gcs_creds = os.environ.get("GCS_DEV_CREDENTIALS")
service_account_info = json.loads(gcs_creds)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
client = storage.Client(credentials=credentials)
bucket = client.bucket("dev-airbyte-cloud-connector-metadata-service")
specs_secrets_mask_blob = bucket.blob(f"{REGISTRIES_FOLDER}/{SPECS_SECRETS_MASK_FILE_NAME}")
try:
logger.info(f"Uploading specs secrets mask to GCS: {specs_secrets_mask_blob.name}")
specs_secrets_mask_blob.upload_from_string(yaml.dump({"properties": sorted(list(specs_secrets))}))
except Exception as e:
logger.error(f"Error uploading specs secrets mask to GCS: {e}")
raise e
def generate_and_persist_specs_secrets_mask(bucket_name: str) -> None:
"""Generate and persist the specs secrets mask to GCS.
Args:
bucket_name (str): The name of the bucket to persist the specs secrets mask to.
Returns:
None
"""
client = get_gcs_storage_client()
bucket = client.bucket(bucket_name)
registries = _get_registries_from_gcs(bucket)
all_entries = [entry for registry in registries for entry in registry.sources + registry.destinations]
all_specs_secrets = _get_specs_secrets_from_registry_entries(all_entries)
try:
_persist_secrets_to_gcs(all_specs_secrets, bucket)
except Exception as e:
message = f"*🤖 🔴 _Specs Secrets Mask Generation_ FAILED*:\nFailed to generate and persist `{SPECS_SECRETS_MASK_FILE_NAME}` to registry GCS bucket."
send_slack_message(PUBLISH_UPDATE_CHANNEL, message)

View File

@@ -13,7 +13,6 @@ import pandas as pd
import requests
import yaml
from github import Auth, Github
from google.cloud import storage
from metadata_service.helpers.gcs import get_gcs_storage_client
from metadata_service.helpers.slack import send_slack_message
@@ -167,7 +166,6 @@ def _get_latest_metadata_entries_on_gcs(bucket_name: str) -> Mapping[str, Any]:
latest_metadata_entries_on_gcs = {}
for blob in blobs:
assert isinstance(blob, storage.Blob)
metadata_dict = yaml.safe_load(blob.download_as_bytes().decode("utf-8"))
connector_metadata = ConnectorMetadataDefinitionV0.parse_obj(metadata_dict)
latest_metadata_entries_on_gcs[connector_metadata.data.dockerRepository] = connector_metadata.data.dockerImageTag

View File

@@ -459,6 +459,18 @@ idna = ["idna (>=3.7)"]
trio = ["trio (>=0.23)"]
wmi = ["wmi (>=1.5.1)"]
[[package]]
name = "dpath"
version = "2.2.0"
description = "Filesystem-like pathing and searching for dictionaries"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "dpath-2.2.0-py3-none-any.whl", hash = "sha256:b330a375ded0a0d2ed404440f6c6a715deae5313af40bbb01c8a41d891900576"},
{file = "dpath-2.2.0.tar.gz", hash = "sha256:34f7e630dc55ea3f219e555726f5da4b4b25f2200319c8e6902c394258dd6a3e"},
]
[[package]]
name = "email-validator"
version = "2.2.0"
@@ -2205,6 +2217,63 @@ files = [
{file = "semver-3.0.4.tar.gz", hash = "sha256:afc7d8c584a5ed0a11033af086e8af226a9c0b206f313e0301f8dd7b6b589602"},
]
[[package]]
name = "sentry-sdk"
version = "2.34.1"
description = "Python client for Sentry (https://sentry.io)"
optional = false
python-versions = ">=3.6"
groups = ["main"]
files = [
{file = "sentry_sdk-2.34.1-py2.py3-none-any.whl", hash = "sha256:b7a072e1cdc5abc48101d5146e1ae680fa81fe886d8d95aaa25a0b450c818d32"},
{file = "sentry_sdk-2.34.1.tar.gz", hash = "sha256:69274eb8c5c38562a544c3e9f68b5be0a43be4b697f5fd385bf98e4fbe672687"},
]
[package.dependencies]
certifi = "*"
urllib3 = ">=1.26.11"
[package.extras]
aiohttp = ["aiohttp (>=3.5)"]
anthropic = ["anthropic (>=0.16)"]
arq = ["arq (>=0.23)"]
asyncpg = ["asyncpg (>=0.23)"]
beam = ["apache-beam (>=2.12)"]
bottle = ["bottle (>=0.12.13)"]
celery = ["celery (>=3)"]
celery-redbeat = ["celery-redbeat (>=2)"]
chalice = ["chalice (>=1.16.0)"]
clickhouse-driver = ["clickhouse-driver (>=0.2.0)"]
django = ["django (>=1.8)"]
falcon = ["falcon (>=1.4)"]
fastapi = ["fastapi (>=0.79.0)"]
flask = ["blinker (>=1.1)", "flask (>=0.11)", "markupsafe"]
grpcio = ["grpcio (>=1.21.1)", "protobuf (>=3.8.0)"]
http2 = ["httpcore[http2] (==1.*)"]
httpx = ["httpx (>=0.16.0)"]
huey = ["huey (>=2)"]
huggingface-hub = ["huggingface_hub (>=0.22)"]
langchain = ["langchain (>=0.0.210)"]
launchdarkly = ["launchdarkly-server-sdk (>=9.8.0)"]
litestar = ["litestar (>=2.0.0)"]
loguru = ["loguru (>=0.5)"]
openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"]
openfeature = ["openfeature-sdk (>=0.7.1)"]
opentelemetry = ["opentelemetry-distro (>=0.35b0)"]
opentelemetry-experimental = ["opentelemetry-distro"]
pure-eval = ["asttokens", "executing", "pure_eval"]
pymongo = ["pymongo (>=3.1)"]
pyspark = ["pyspark (>=2.4.4)"]
quart = ["blinker (>=1.1)", "quart (>=0.16.1)"]
rq = ["rq (>=0.6)"]
sanic = ["sanic (>=0.8)"]
sqlalchemy = ["sqlalchemy (>=1.2)"]
starlette = ["starlette (>=0.19.1)"]
starlite = ["starlite (>=1.48)"]
statsig = ["statsig (>=0.55.3)"]
tornado = ["tornado (>=6)"]
unleash = ["UnleashClient (>=6.0.1)"]
[[package]]
name = "six"
version = "1.17.0"
@@ -2421,4 +2490,4 @@ zstd = ["zstandard (>=0.18.0)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "c87174c60511ea2194170ca8410ed909abf14c6b2b9a315342f679d754b1551c"
content-hash = "0f0c3ed7b5ee4938d11da83397ce862cf7572eb2570ece47fed6e1af923512c1"

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "metadata-service"
version = "0.27.0"
version = "0.28.0"
description = ""
authors = ["Airbyte <contact@airbyte.io>"]
readme = "README.md"
@@ -21,6 +21,8 @@ pygithub = "^2.7.0"
pandas = "^2.3.1"
tabulate = "^0.9.0"
slack-sdk = "^3.36.0"
dpath = "^2.2.0"
sentry-sdk = "^2.34.1"
[tool.poetry.group.dev.dependencies]

View File

@@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from datetime import datetime
from pathlib import Path
from typing import Optional
@@ -19,6 +20,7 @@ from metadata_service.constants import (
RELEASE_CANDIDATE_GCS_FOLDER_NAME,
)
from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0
from metadata_service.models.generated.GitInfo import GitInfo
from metadata_service.models.transform import to_json_sanitized_dict
from metadata_service.validators.metadata_validator import ValidatorOptions
@@ -55,6 +57,68 @@ def mock_local_doc_path_exists(monkeypatch):
monkeypatch.setattr(Path, "exists", fake_exists)
@pytest.fixture
def temp_manifest_content():
"""Sample manifest.yaml content for testing."""
return """
# Temporary manifest file for testing
description: Test manifest
author: Test Author
version: 1.0.0
""".strip()
@pytest.fixture
def temp_components_content():
"""Sample components.py content for testing."""
return """
# Temporary components.py file for testing
def test_component():
pass
""".strip()
@pytest.fixture
def temp_metadata_directory(
tmp_path, valid_metadata_upload_files, manifest_exists, components_py_exists, temp_manifest_content, temp_components_content
):
"""Create a temporary directory structure with optional manifest and components files based on test parameters."""
# Copy base metadata.yaml from existing fixture to temp directory
base_metadata_file = valid_metadata_upload_files[0]
temp_metadata_path = tmp_path / "metadata.yaml"
# Copy the content from the base metadata file
with open(base_metadata_file, "r") as f:
temp_metadata_path.write_text(f.read())
# Conditionally create manifest.yaml based on test parameter
if manifest_exists:
manifest_path = tmp_path / MANIFEST_FILE_NAME
manifest_path.write_text(temp_manifest_content)
# Conditionally create components.py based on test parameter
if components_py_exists:
components_path = tmp_path / COMPONENTS_PY_FILE_NAME
components_path.write_text(temp_components_content)
return temp_metadata_path
@pytest.fixture
def mock_git_operations(mocker):
"""Mock Git operations to avoid repository issues with temporary files."""
# Create a proper GitInfo Pydantic model instance
mock_git_info = GitInfo(
commit_sha="abc123def456",
commit_timestamp=datetime(2024, 1, 1, 0, 0, 0),
commit_author="Test Author",
commit_author_email="test@example.com",
)
mocker.patch("metadata_service.gcs_upload._get_git_info_for_file", return_value=mock_git_info)
return mock_git_info
# Custom Assertions
@@ -346,7 +410,6 @@ def test_upload_metadata_to_gcs_valid_metadata(
mocker.spy(gcs_upload, "_file_upload")
mocker.spy(gcs_upload, "upload_file_if_changed")
for valid_metadata_upload_file in valid_metadata_upload_files:
print("\nTesting upload of valid metadata file: " + valid_metadata_upload_file)
metadata_file_path = Path(valid_metadata_upload_file)
metadata = ConnectorMetadataDefinitionV0.parse_obj(yaml.safe_load(metadata_file_path.read_text()))
mocks = setup_upload_mocks(
@@ -484,7 +547,6 @@ def test_upload_invalid_metadata_to_gcs(mocker, invalid_metadata_yaml_files):
# Test that all invalid metadata files throw a ValueError
for invalid_metadata_file in invalid_metadata_yaml_files:
print("\nTesting upload of invalid metadata file: " + invalid_metadata_file)
metadata_file_path = Path(invalid_metadata_file)
error_match_if_validation_fails_as_expected = "Validation error"
@@ -504,7 +566,6 @@ def test_upload_metadata_to_gcs_invalid_docker_images(mocker, invalid_metadata_u
# Test that valid metadata files that reference invalid docker images throw a ValueError
for invalid_metadata_file in invalid_metadata_upload_files:
print("\nTesting upload of valid metadata file with invalid docker image: " + invalid_metadata_file)
metadata_file_path = Path(invalid_metadata_file)
error_match_if_validation_fails_as_expected = "does not exist in DockerHub"
@@ -528,7 +589,6 @@ def test_upload_metadata_to_gcs_with_prerelease(mocker, valid_metadata_upload_fi
if tmp_metadata_file_path.exists():
tmp_metadata_file_path.unlink()
print("\nTesting prerelease upload of valid metadata file: " + valid_metadata_upload_file)
metadata_file_path = Path(valid_metadata_upload_file)
metadata = ConnectorMetadataDefinitionV0.parse_obj(yaml.safe_load(metadata_file_path.read_text()))
expected_version_key = f"metadata/{metadata.data.dockerRepository}/{prerelease_image_tag}/{METADATA_FILE_NAME}"
@@ -691,34 +751,44 @@ def test_upload_metadata_to_gcs_release_candidate(mocker, get_fixture_path, tmp_
@pytest.mark.parametrize(
"manifest_exists, components_py_exists",
[
(True, True),
(True, False),
(False, True),
(False, False),
(True, True), # Both files exist
(True, False), # Only manifest exists
(False, True), # Only components.py exists
(False, False), # Neither file exists
],
)
def test_upload_metadata_to_gcs_with_manifest_files(
mocker, valid_metadata_upload_files, tmp_path, monkeypatch, manifest_exists, components_py_exists
mocker, temp_metadata_directory, tmp_path, manifest_exists, components_py_exists, mock_git_operations
):
mocker.spy(gcs_upload, "_file_upload")
mocker.spy(gcs_upload, "upload_file_if_changed")
valid_metadata_upload_file = valid_metadata_upload_files[0]
metadata_file_path = Path(valid_metadata_upload_file)
# Use the temporary metadata file created by the fixture
metadata_file_path = temp_metadata_directory
expected_manifest_file_path = metadata_file_path.parent / MANIFEST_FILE_NAME
expected_components_py_file_path = metadata_file_path.parent / COMPONENTS_PY_FILE_NAME
# Mock file paths to conditionally exist
original_exists = Path.exists
# No more Path.exists mocking needed - files either exist or don't based on fixture creation!
# Git operations are mocked by the mock_git_operations fixture!
def fake_exists(self):
if self == expected_manifest_file_path:
return manifest_exists
if self == expected_components_py_file_path:
return components_py_exists
return original_exists(self)
monkeypatch.setattr(Path, "exists", fake_exists)
# Mock the _safe_load_metadata_file function to bypass YAML parsing issues in test environment
sample_metadata = {
"metadataSpecVersion": "1.0",
"data": {
"name": "Test Connector",
"definitionId": "12345678-1234-1234-1234-123456789012",
"connectorType": "source",
"dockerRepository": "airbyte/source-exists-test",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.com/test",
"license": "MIT",
"githubIssueLabel": "source-test",
"connectorSubtype": "api",
"releaseStage": "alpha",
"tags": ["language:python"],
},
}
mocker.patch("metadata_service.gcs_upload._safe_load_metadata_file", return_value=sample_metadata)
# mock create_zip_and_get_sha256
mocker.patch.object(gcs_upload, "create_zip_and_get_sha256", mocker.Mock(return_value="fake_zip_sha256"))

View File

@@ -0,0 +1,342 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import json
import os
from unittest.mock import Mock, patch
import pytest
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.constants import REGISTRIES_FOLDER
from metadata_service.models.generated import ConnectorRegistryV0
from metadata_service.registry import (
ConnectorTypePrimaryKey,
ConnectorTypes,
PolymorphicRegistryEntry,
_apply_metrics_to_registry_entry,
_build_connector_registry,
_convert_json_to_metrics_dict,
_get_connector_type_from_registry_entry,
_persist_registry,
)
class TestGetConnectorTypeFromRegistryEntry:
"""Tests for _get_connector_type_from_registry_entry function."""
@pytest.fixture
def mock_source_entry(self):
"""Create a mock source registry entry."""
entry = Mock(spec=PolymorphicRegistryEntry)
setattr(entry, ConnectorTypePrimaryKey.SOURCE.value, "test-source-id")
return entry
@pytest.fixture
def mock_destination_entry(self):
"""Create a mock destination registry entry."""
entry = Mock(spec=PolymorphicRegistryEntry)
setattr(entry, ConnectorTypePrimaryKey.DESTINATION.value, "test-destination-id")
return entry
@pytest.fixture
def mock_invalid_entry(self):
"""Create a mock entry that is neither source nor destination."""
entry = Mock(spec=PolymorphicRegistryEntry)
return entry
@pytest.mark.parametrize(
"entry_fixture,expected_type,description",
[
("mock_source_entry", ConnectorTypes.SOURCE, "source entry"),
("mock_destination_entry", ConnectorTypes.DESTINATION, "destination entry"),
],
)
def test_get_connector_type_from_registry_entry_types(self, entry_fixture, expected_type, description, request):
"""Test connector type detection from registry entries."""
registry_entry = request.getfixturevalue(entry_fixture)
result = _get_connector_type_from_registry_entry(registry_entry)
assert result == expected_type
assert isinstance(result, ConnectorTypes)
@pytest.mark.parametrize(
"entry_fixture,expected_type,has_attribute,not_has_attribute,description",
[
(
"mock_source_entry",
ConnectorTypes.SOURCE,
ConnectorTypePrimaryKey.SOURCE.value,
ConnectorTypePrimaryKey.DESTINATION.value,
"source entry",
),
(
"mock_destination_entry",
ConnectorTypes.DESTINATION,
ConnectorTypePrimaryKey.DESTINATION.value,
ConnectorTypePrimaryKey.SOURCE.value,
"destination entry",
),
],
)
def test_get_connector_type_from_registry_entry_has_correct_attribute(
self, entry_fixture, expected_type, has_attribute, not_has_attribute, description, request
):
registry_entry = request.getfixturevalue(entry_fixture)
assert hasattr(registry_entry, has_attribute)
assert not hasattr(registry_entry, not_has_attribute)
result = _get_connector_type_from_registry_entry(registry_entry)
assert result == expected_type
def test_get_connector_type_from_registry_entry_invalid_raises_error(self, mock_invalid_entry):
"""Test that invalid entry raises ValueError."""
assert not hasattr(mock_invalid_entry, ConnectorTypePrimaryKey.SOURCE.value)
assert not hasattr(mock_invalid_entry, ConnectorTypePrimaryKey.DESTINATION.value)
with pytest.raises(ValueError) as exc_info:
_get_connector_type_from_registry_entry(mock_invalid_entry)
assert "Registry entry is not a source or destination" in str(exc_info.value)
class TestConvertJsonToMetricsDict:
"""Tests for _convert_json_to_metrics_dict function."""
@pytest.mark.parametrize(
"jsonl_input,expected_output,description",
[
(
'{"_airbyte_data": {"connector_definition_id": "conn-123", "airbyte_platform": "cloud", "usage": 100}}',
{"conn-123": {"cloud": {"connector_definition_id": "conn-123", "airbyte_platform": "cloud", "usage": 100}}},
"single connector",
),
(
'{"_airbyte_data": {"connector_definition_id": "conn-123", "airbyte_platform": "cloud", "usage": 100}}\n{"_airbyte_data": {"connector_definition_id": "conn-456", "airbyte_platform": "oss", "usage": 50}}',
{
"conn-123": {"cloud": {"connector_definition_id": "conn-123", "airbyte_platform": "cloud", "usage": 100}},
"conn-456": {"oss": {"connector_definition_id": "conn-456", "airbyte_platform": "oss", "usage": 50}},
},
"multiple connectors",
),
("", {}, "empty input"),
],
)
def test_convert_json_to_metrics_dict_valid_jsonl(self, jsonl_input, expected_output, description):
"""Test JSONL string conversion to metrics dictionary."""
result = _convert_json_to_metrics_dict(jsonl_input)
assert result == expected_output
class TestApplyMetricsToRegistryEntry:
"""Tests for _apply_metrics_to_registry_entry function."""
@pytest.mark.parametrize(
"connector_type,registry_entry,metrics_dict,expected_metrics,description",
[
(
ConnectorTypes.SOURCE,
{"sourceDefinitionId": "source-123", "name": "Test Source"},
{"source-123": {"cloud": {"usage": 100}}},
{"cloud": {"usage": 100}},
"source with matching metrics",
),
(
ConnectorTypes.DESTINATION,
{"destinationDefinitionId": "dest-456", "name": "Test Destination"},
{"dest-456": {"oss": {"usage": 50}}},
{"oss": {"usage": 50}},
"destination with matching metrics",
),
(
ConnectorTypes.SOURCE,
{"sourceDefinitionId": "source-999", "name": "No Metrics Source"},
{},
{},
"entry with no matching metrics",
),
],
)
def test_apply_metrics_to_registry_entry_scenarios(self, connector_type, registry_entry, metrics_dict, expected_metrics, description):
"""Test metrics application to registry entries."""
result = _apply_metrics_to_registry_entry(registry_entry, connector_type, metrics_dict)
assert result["generated"]["metrics"] == expected_metrics
assert result["name"] == registry_entry["name"]
def test_apply_metrics_to_registry_entry_preserves_existing_structure(self):
"""Test that existing registry entry structure is preserved."""
registry_entry = {"sourceDefinitionId": "source-123", "name": "Test Source", "existing_field": "value"}
metrics_dict = {"source-123": {"cloud": {"usage": 100}}}
result = _apply_metrics_to_registry_entry(registry_entry, ConnectorTypes.SOURCE, metrics_dict)
assert result["existing_field"] == "value"
assert result["name"] == "Test Source"
assert result["generated"]["metrics"] == {"cloud": {"usage": 100}}
class TestBuildConnectorRegistry:
"""Tests for _build_connector_registry function."""
@pytest.mark.parametrize(
"entry_dicts,expected_sources_count,expected_destinations_count,description",
[
(
[
{
"sourceDefinitionId": "550e8400-e29b-41d4-a716-446655440001",
"name": "Source 1",
"dockerRepository": "test/source",
"dockerImageTag": "1.0.0",
"documentationUrl": "https://docs.test.com",
"spec": {},
},
{
"destinationDefinitionId": "550e8400-e29b-41d4-a716-446655440002",
"name": "Destination 1",
"dockerRepository": "test/dest",
"dockerImageTag": "1.0.0",
"documentationUrl": "https://docs.test.com",
"spec": {},
},
],
1,
1,
"mixed sources and destinations",
),
(
[
{
"sourceDefinitionId": "550e8400-e29b-41d4-a716-446655440001",
"name": "Source 1",
"dockerRepository": "test/source1",
"dockerImageTag": "1.0.0",
"documentationUrl": "https://docs.test.com",
"spec": {},
},
{
"sourceDefinitionId": "550e8400-e29b-41d4-a716-446655440002",
"name": "Source 2",
"dockerRepository": "test/source2",
"dockerImageTag": "1.0.0",
"documentationUrl": "https://docs.test.com",
"spec": {},
},
],
2,
0,
"sources only",
),
([], 0, 0, "empty entries"),
],
)
def test_build_connector_registry_scenarios(self, entry_dicts, expected_sources_count, expected_destinations_count, description):
"""Test registry building with different entry combinations."""
entries = []
for entry_dict in entry_dicts:
entry = Mock(spec=PolymorphicRegistryEntry)
if "sourceDefinitionId" in entry_dict:
setattr(entry, ConnectorTypePrimaryKey.SOURCE.value, entry_dict["sourceDefinitionId"])
if "destinationDefinitionId" in entry_dict:
setattr(entry, ConnectorTypePrimaryKey.DESTINATION.value, entry_dict["destinationDefinitionId"])
entries.append(entry)
with (
patch("metadata_service.registry.to_json_sanitized_dict") as mock_sanitize,
patch("metadata_service.registry._apply_metrics_to_registry_entry") as mock_apply_metrics,
patch("metadata_service.registry._apply_release_candidate_entries") as mock_apply_rc,
):
mock_sanitize.side_effect = entry_dicts
mock_apply_metrics.side_effect = lambda x, *args: x
mock_apply_rc.side_effect = lambda x, *args: x
result = _build_connector_registry(entries, {}, {})
assert isinstance(result, ConnectorRegistryV0)
assert len(result.sources) == expected_sources_count
assert len(result.destinations) == expected_destinations_count
def test_build_connector_registry_applies_metrics_and_rc(self):
"""Test that metrics and release candidates are properly applied."""
source_entry = Mock(spec=PolymorphicRegistryEntry)
setattr(source_entry, ConnectorTypePrimaryKey.SOURCE.value, "550e8400-e29b-41d4-a716-446655440001")
entry_dict = {
"sourceDefinitionId": "550e8400-e29b-41d4-a716-446655440001",
"name": "Test Source",
"dockerRepository": "test/source",
"dockerImageTag": "1.0.0",
"documentationUrl": "https://docs.test.com",
"spec": {},
}
with (
patch("metadata_service.registry.to_json_sanitized_dict") as mock_sanitize,
patch("metadata_service.registry._apply_metrics_to_registry_entry") as mock_apply_metrics,
patch("metadata_service.registry._apply_release_candidate_entries") as mock_apply_rc,
):
mock_sanitize.return_value = entry_dict
mock_apply_metrics.return_value = entry_dict
mock_apply_rc.return_value = entry_dict
result = _build_connector_registry([source_entry], {}, {})
mock_apply_metrics.assert_called_once()
mock_apply_rc.assert_called_once()
assert len(result.sources) == 1
class TestPersistRegistryToJson:
"""Tests for _persist_registry_to_json function."""
@pytest.fixture
def mock_registry(self):
"""Create a mock registry object."""
registry = Mock(spec=ConnectorRegistryV0)
registry.json.return_value = '{"sources": [], "destinations": []}'
return registry
@pytest.fixture
def mock_gcs_credentials(self):
"""Mock GCS credentials environment variable."""
return {
"type": "service_account",
"project_id": "test-project",
"private_key": "-----BEGIN PRIVATE KEY-----\ntest-key\n-----END PRIVATE KEY-----\n",
"client_email": "test@test-project.iam.gserviceaccount.com",
}
@pytest.mark.parametrize(
"registry_type,expected_filename,description",
[
("cloud", "cloud_registry.json", "cloud registry"),
("oss", "oss_registry.json", "oss registry"),
],
)
def test_persist_registry_success(self, mock_registry, mock_gcs_credentials, registry_type, expected_filename, description):
"""Test successful registry persistence to GCS."""
with (
patch.dict(os.environ, {"GCS_DEV_CREDENTIALS": json.dumps(mock_gcs_credentials)}),
patch("metadata_service.registry.service_account.Credentials.from_service_account_info") as mock_creds,
patch("metadata_service.registry.storage.Client") as mock_client_class,
):
mock_client = Mock()
mock_client_class.return_value = mock_client
mock_bucket = Mock()
mock_client.bucket.return_value = mock_bucket
mock_blob = Mock()
mock_bucket.blob.return_value = mock_blob
_persist_registry(mock_registry, registry_type, Mock())
mock_bucket.blob.assert_called_once_with(f"{REGISTRIES_FOLDER}/{expected_filename}")
mock_blob.upload_from_string.assert_called_once()

View File

@@ -0,0 +1,346 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import json
import os
from unittest.mock import Mock, patch
import pytest
import yaml
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.constants import REGISTRIES_FOLDER, SPECS_SECRETS_MASK_FILE_NAME, VALID_REGISTRIES
from metadata_service.models.generated import ConnectorRegistryV0
from metadata_service.registry import PolymorphicRegistryEntry
from metadata_service.specs_secrets_mask import _get_registries_from_gcs, _get_specs_secrets_from_registry_entries, _persist_secrets_to_gcs
class TestGetRegistriesFromGcs:
"""Tests for _get_registries_from_gcs function."""
@pytest.fixture
def mock_bucket(self):
"""Create a mock GCS bucket."""
return Mock(spec=storage.Bucket)
@pytest.fixture
def valid_registry_data(self):
"""Sample valid registry data."""
return {
"sources": [],
"destinations": [],
}
@pytest.fixture
def mock_blob(self):
"""Create a mock GCS blob."""
return Mock(spec=storage.Blob)
def test_get_registries_from_gcs_success(self, mock_bucket, mock_blob, valid_registry_data):
"""Test successful retrieval of all valid registries from GCS."""
mock_bucket.blob.return_value = mock_blob
with (
patch("metadata_service.specs_secrets_mask.safe_read_gcs_file") as mock_safe_read,
patch("metadata_service.specs_secrets_mask.ConnectorRegistryV0.parse_obj") as mock_parse,
):
mock_safe_read.return_value = json.dumps(valid_registry_data)
mock_registry = Mock(spec=ConnectorRegistryV0)
mock_parse.return_value = mock_registry
result = _get_registries_from_gcs(mock_bucket)
assert len(result) == len(VALID_REGISTRIES)
assert all(registry == mock_registry for registry in result)
expected_calls = [f"{REGISTRIES_FOLDER}/{registry}_registry.json" for registry in VALID_REGISTRIES]
actual_calls = [call[0][0] for call in mock_bucket.blob.call_args_list]
assert actual_calls == expected_calls
assert mock_safe_read.call_count == len(VALID_REGISTRIES)
def test_get_registries_from_gcs_multiple_registries(self, mock_bucket, mock_blob, valid_registry_data):
"""Test that the function correctly processes all registries in VALID_REGISTRIES."""
mock_bucket.blob.return_value = mock_blob
with (
patch("metadata_service.specs_secrets_mask.safe_read_gcs_file") as mock_safe_read,
patch("metadata_service.specs_secrets_mask.ConnectorRegistryV0.parse_obj") as mock_parse,
):
mock_safe_read.return_value = json.dumps(valid_registry_data)
mock_registry = Mock(spec=ConnectorRegistryV0)
mock_parse.return_value = mock_registry
result = _get_registries_from_gcs(mock_bucket)
assert len(result) == 2
assert mock_parse.call_count == 2
assert mock_safe_read.call_count == 2
class TestGetSpecsSecretsFromRegistryEntries:
"""Tests for _get_specs_secrets_from_registry_entries function."""
@pytest.fixture
def mock_registry_entry(self):
"""Create a mock registry entry."""
return Mock(spec=PolymorphicRegistryEntry)
@pytest.fixture
def single_secret_entry_data(self):
"""Sample entry data with a single secret property."""
return {
"spec": {
"connectionSpecification": {
"properties": {
"password": {"type": "string", "airbyte_secret": True},
"username": {"type": "string", "airbyte_secret": False},
}
}
}
}
@pytest.fixture
def multiple_secrets_entry_data(self):
"""Sample entry data with multiple secret properties."""
return {
"spec": {
"connectionSpecification": {
"properties": {
"password": {"type": "string", "airbyte_secret": True},
"api_key": {"type": "string", "airbyte_secret": True},
"username": {"type": "string", "airbyte_secret": False},
}
}
}
}
@pytest.fixture
def nested_secrets_entry_data(self):
"""Sample entry data with nested secret properties."""
return {
"spec": {
"connectionSpecification": {
"properties": {
"oauth": {
"type": "object",
"properties": {
"client_secret": {"type": "string", "airbyte_secret": True},
"client_id": {"type": "string", "airbyte_secret": False},
},
},
"username": {"type": "string", "airbyte_secret": False},
}
}
}
}
@pytest.fixture
def deeply_nested_secrets_entry_data(self):
"""Sample entry data with deeply nested secret properties."""
return {
"spec": {
"connectionSpecification": {
"properties": {
"connection": {
"type": "object",
"properties": {
"auth": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {"secret_token": {"type": "string", "airbyte_secret": True}},
}
},
}
},
}
}
}
}
}
@pytest.fixture
def no_secrets_entry_data(self):
"""Sample entry data with no secret properties."""
return {
"spec": {
"connectionSpecification": {
"properties": {"username": {"type": "string", "airbyte_secret": False}, "host": {"type": "string"}}
}
}
}
@pytest.mark.parametrize(
"entry_data_fixture,expected_secrets,description",
[
("single_secret_entry_data", {"password"}, "single secret property"),
("multiple_secrets_entry_data", {"password", "api_key"}, "multiple secret properties"),
("nested_secrets_entry_data", {"client_secret"}, "nested secret property"),
("deeply_nested_secrets_entry_data", {"secret_token"}, "deeply nested secret property"),
("no_secrets_entry_data", set(), "no secret properties"),
],
)
def test_get_specs_secrets_valid_structures(self, mock_registry_entry, entry_data_fixture, expected_secrets, description, request):
"""Test extraction from various valid entry structures."""
entry_data = request.getfixturevalue(entry_data_fixture)
with patch("metadata_service.specs_secrets_mask.to_json_sanitized_dict") as mock_sanitize:
mock_sanitize.return_value = entry_data
result = _get_specs_secrets_from_registry_entries([mock_registry_entry])
assert result == expected_secrets, f"Failed for {description}"
mock_sanitize.assert_called_once_with(mock_registry_entry)
def test_get_specs_secrets_multiple_entries_aggregation(self, mock_registry_entry, single_secret_entry_data, nested_secrets_entry_data):
"""Test that secrets from multiple entries are properly aggregated."""
entry1 = Mock(spec=PolymorphicRegistryEntry)
entry2 = Mock(spec=PolymorphicRegistryEntry)
with patch("metadata_service.specs_secrets_mask.to_json_sanitized_dict") as mock_sanitize:
mock_sanitize.side_effect = [single_secret_entry_data, nested_secrets_entry_data]
result = _get_specs_secrets_from_registry_entries([entry1, entry2])
assert result == {"password", "client_secret"}
assert mock_sanitize.call_count == 2
def test_get_specs_secrets_duplicate_secrets_handling(self, single_secret_entry_data):
"""Test that duplicate secret names from different entries are handled correctly."""
entry1 = Mock(spec=PolymorphicRegistryEntry)
entry2 = Mock(spec=PolymorphicRegistryEntry)
with patch("metadata_service.specs_secrets_mask.to_json_sanitized_dict") as mock_sanitize:
mock_sanitize.return_value = single_secret_entry_data
result = _get_specs_secrets_from_registry_entries([entry1, entry2])
assert result == {"password"}
assert mock_sanitize.call_count == 2
def test_get_specs_secrets_empty_entries_list(self):
"""Test behavior with empty entries list."""
result = _get_specs_secrets_from_registry_entries([])
assert result == set()
def test_get_specs_secrets_complex_real_world_structure(self, mock_registry_entry):
"""Test with realistic connector specification structure."""
complex_entry_data = {
"spec": {
"connectionSpecification": {
"properties": {
"host": {"type": "string"},
"port": {"type": "integer"},
"database": {"type": "string"},
"credentials": {
"type": "object",
"oneOf": [
{
"properties": {
"auth_type": {"type": "string", "const": "username_password"},
"username": {"type": "string"},
"password": {"type": "string", "airbyte_secret": True},
}
},
{
"properties": {
"auth_type": {"type": "string", "const": "oauth2"},
"client_id": {"type": "string"},
"client_secret": {"type": "string", "airbyte_secret": True},
"refresh_token": {"type": "string", "airbyte_secret": True},
}
},
],
},
"ssl_config": {
"type": "object",
"properties": {"ssl_mode": {"type": "string"}, "client_key": {"type": "string", "airbyte_secret": True}},
},
}
}
}
}
with patch("metadata_service.specs_secrets_mask.to_json_sanitized_dict") as mock_sanitize:
mock_sanitize.return_value = complex_entry_data
result = _get_specs_secrets_from_registry_entries([mock_registry_entry])
expected_secrets = {"password", "client_secret", "refresh_token", "client_key"}
assert result == expected_secrets
class TestPersistSecretsToGcs:
"""Tests for _persist_secrets_to_gcs function."""
@pytest.fixture
def mock_bucket(self):
"""Create a mock GCS bucket."""
return Mock(spec=storage.Bucket)
@pytest.fixture
def mock_blob(self):
"""Create a mock GCS blob."""
mock_blob = Mock()
mock_blob.name = f"{REGISTRIES_FOLDER}/{SPECS_SECRETS_MASK_FILE_NAME}"
return mock_blob
@pytest.fixture
def mock_gcs_credentials(self):
"""Mock GCS credentials environment variable."""
return {
"type": "service_account",
"project_id": "test-project",
"private_key_id": "test-key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\ntest-key\n-----END PRIVATE KEY-----\n",
"client_email": "test@test-project.iam.gserviceaccount.com",
"client_id": "123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
}
@pytest.mark.parametrize(
"secrets_set,expected_yaml_content,description",
[
(set(), {"properties": []}, "empty secrets set"),
({"password"}, {"properties": ["password"]}, "single secret"),
({"password", "api_key", "token"}, {"properties": ["api_key", "password", "token"]}, "multiple secrets sorted"),
({"z_secret", "a_secret", "m_secret"}, {"properties": ["a_secret", "m_secret", "z_secret"]}, "secrets sorted alphabetically"),
],
)
def test_persist_secrets_to_gcs_various_secret_sets(
self, mock_bucket, mock_blob, mock_gcs_credentials, secrets_set, expected_yaml_content, description
):
"""Test persistence with different secret set sizes and contents."""
with (
patch.dict(os.environ, {"GCS_DEV_CREDENTIALS": json.dumps(mock_gcs_credentials)}),
patch("metadata_service.specs_secrets_mask.service_account.Credentials.from_service_account_info") as mock_creds,
patch("metadata_service.specs_secrets_mask.storage.Client") as mock_client_class,
):
mock_credentials = Mock(spec=service_account.Credentials)
mock_creds.return_value = mock_credentials
mock_client = Mock()
mock_client_class.return_value = mock_client
mock_dev_bucket = Mock(spec=storage.Bucket)
mock_client.bucket.return_value = mock_dev_bucket
mock_dev_bucket.blob.return_value = mock_blob
_persist_secrets_to_gcs(secrets_set, mock_bucket)
mock_creds.assert_called_once_with(mock_gcs_credentials)
mock_client_class.assert_called_once_with(credentials=mock_credentials)
mock_client.bucket.assert_called_once_with("dev-airbyte-cloud-connector-metadata-service")
mock_dev_bucket.blob.assert_called_once_with(f"{REGISTRIES_FOLDER}/{SPECS_SECRETS_MASK_FILE_NAME}")
mock_blob.upload_from_string.assert_called_once()
uploaded_content = mock_blob.upload_from_string.call_args[0][0]
parsed_yaml = yaml.safe_load(uploaded_content)
assert parsed_yaml == expected_yaml_content, f"Failed for {description}"

View File

@@ -135,7 +135,6 @@ def test_get_latest_metadata_entries_on_gcs_success(mock_gcs_blobs):
with (
patch("metadata_service.stale_metadata_report.get_gcs_storage_client") as mock_get_client,
patch("metadata_service.stale_metadata_report.yaml") as mock_yaml,
patch("metadata_service.stale_metadata_report.storage.Blob", new=Mock),
):
mock_storage_client = Mock()
mock_bucket = Mock()