qa-engine: implement fetch_adoption_metrics_per_connector_version (#21840)
This commit is contained in:
2
.github/workflows/run-qa-engine.yml
vendored
2
.github/workflows/run-qa-engine.yml
vendored
@@ -27,4 +27,6 @@ jobs:
|
||||
- name: Install ci-connector-ops package
|
||||
run: pip install --quiet -e ./tools/ci_connector_ops
|
||||
- name: Run QA Engine
|
||||
env:
|
||||
QA_ENGINE_AIRBYTE_DATA_PROD_SA: '${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}'
|
||||
run: run-qa-engine
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
WITH official_connector_syncs AS (
|
||||
SELECT * FROM airbyte_warehouse.connector_sync WHERE is_officially_published and (job_status = "failed" or job_status = "succeeded")
|
||||
),
|
||||
adoption_per_version AS (
|
||||
SELECT
|
||||
connector_definition_id,
|
||||
docker_repository,
|
||||
connector_version,
|
||||
COUNT(distinct(user_id)) as number_of_users,
|
||||
COUNT(distinct(connection_id)) as number_of_connections
|
||||
FROM
|
||||
official_connector_syncs
|
||||
GROUP BY connector_definition_id, docker_repository, connector_version
|
||||
),
|
||||
job_status_per_version AS (
|
||||
SELECT
|
||||
connector_definition_id,
|
||||
docker_repository,
|
||||
connector_version,
|
||||
job_status,
|
||||
COUNT(1) as sync_count
|
||||
FROM official_connector_syncs
|
||||
GROUP BY connector_definition_id, docker_repository, connector_version, job_status
|
||||
),
|
||||
success_failure_by_connector_version AS (
|
||||
SELECT
|
||||
connector_definition_id,
|
||||
docker_repository,
|
||||
connector_version,
|
||||
ifnull(failed, 0) as failed_syncs_count,
|
||||
ifnull(succeeded, 0) as succeeded_syncs_count,
|
||||
ifnull(succeeded, 0) + ifnull(failed, 0) as total_syncs_count,
|
||||
SAFE_DIVIDE(ifnull(succeeded, 0), ifnull(succeeded, 0) + ifnull(failed, 0)) as sync_success_rate
|
||||
FROM job_status_per_version
|
||||
PIVOT
|
||||
(
|
||||
max(sync_count)
|
||||
FOR job_status in ('failed', 'succeeded')
|
||||
)
|
||||
)
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
adoption_per_version
|
||||
LEFT JOIN success_failure_by_connector_version
|
||||
USING (connector_definition_id, docker_repository, connector_version);
|
||||
@@ -2,7 +2,11 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import os
|
||||
from importlib.resources import files
|
||||
import json
|
||||
|
||||
from google.oauth2 import service_account
|
||||
import requests
|
||||
import pandas as pd
|
||||
|
||||
@@ -31,18 +35,21 @@ def fetch_adoption_metrics_per_connector_version() -> pd.DataFrame:
|
||||
"""Retrieve adoptions metrics for each connector version from our data warehouse.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: A Dataframe with adoption metrics per connector version.
|
||||
pd.DataFrame: A dataframe with adoption metrics per connector version.
|
||||
"""
|
||||
# TODO: directly query BigQuery
|
||||
# use query in https://airbyte.metabaseapp.com/question/1642-adoption-and-success-rate-per-connector-version-oss-cloud
|
||||
return pd.DataFrame(columns=[
|
||||
connector_adoption_sql = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text()
|
||||
bq_credentials = service_account.Credentials.from_service_account_info(json.loads(os.environ["QA_ENGINE_AIRBYTE_DATA_PROD_SA"]))
|
||||
adoption_metrics = pd.read_gbq(connector_adoption_sql, project_id="airbyte-data-prod", credentials=bq_credentials)
|
||||
return adoption_metrics[[
|
||||
"connector_definition_id",
|
||||
"connector_version",
|
||||
"number_of_connections",
|
||||
"number_of_users",
|
||||
"succeeded_syncs_count",
|
||||
"failed_syncs_count",
|
||||
"total_syncs_count",
|
||||
"sync_success_rate",
|
||||
])
|
||||
]]
|
||||
|
||||
CLOUD_CATALOG = fetch_remote_catalog(CLOUD_CATALOG_URL)
|
||||
OSS_CATALOG = fetch_remote_catalog(OSS_CATALOG_URL)
|
||||
ADOPTION_METRICS_PER_CONNECTOR_VERSION = fetch_adoption_metrics_per_connector_version()
|
||||
|
||||
@@ -10,6 +10,7 @@ MAIN_REQUIREMENTS = [
|
||||
"PyYAML~=6.0",
|
||||
"GitPython~=3.1.29",
|
||||
"pandas~=1.5.3",
|
||||
"pandas-gbq~=0.19.0",
|
||||
"pydantic~=1.10.4",
|
||||
"fsspec~=2023.1.0",
|
||||
"gcsfs~=2023.1.0"
|
||||
@@ -22,7 +23,7 @@ TEST_REQUIREMENTS = [
|
||||
|
||||
|
||||
setup(
|
||||
version="0.1.4",
|
||||
version="0.1.5",
|
||||
name="ci_connector_ops",
|
||||
description="Packaged maintained by the connector operations team to perform CI for connectors",
|
||||
author="Airbyte",
|
||||
@@ -33,6 +34,7 @@ setup(
|
||||
"tests": TEST_REQUIREMENTS,
|
||||
},
|
||||
python_requires=">=3.9",
|
||||
package_data={"ci_connector_ops.qa_engine": ["connector_adoption.sql"]},
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"check-test-strictness-level = ci_connector_ops.sat_config_checks:check_test_strictness_level",
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
#
|
||||
|
||||
|
||||
from importlib.resources import files
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
@@ -16,15 +18,41 @@ def test_fetch_remote_catalog(catalog_url):
|
||||
assert all(expected_column in catalog.columns for expected_column in expected_columns)
|
||||
assert set(catalog.connector_type.unique()) == {"source", "destination"}
|
||||
|
||||
def test_fetch_adoption_metrics_per_connector_version():
|
||||
def test_fetch_adoption_metrics_per_connector_version(mocker):
|
||||
fake_bigquery_results = pd.DataFrame([{
|
||||
"connector_definition_id": "abcdefgh",
|
||||
"connector_version": "0.0.0",
|
||||
"number_of_connections": 10,
|
||||
"number_of_users": 2,
|
||||
"succeeded_syncs_count": 12,
|
||||
"failed_syncs_count": 1,
|
||||
"total_syncs_count": 3,
|
||||
"sync_success_rate": .99,
|
||||
"unexpected_column": "foobar"
|
||||
}])
|
||||
mocker.patch.object(inputs.pd, "read_gbq", mocker.Mock(return_value=fake_bigquery_results))
|
||||
mocker.patch.object(inputs.os, "environ", {"QA_ENGINE_AIRBYTE_DATA_PROD_SA": '{"type": "fake_service_account"}'})
|
||||
mocker.patch.object(inputs.service_account.Credentials, "from_service_account_info")
|
||||
expected_columns = {
|
||||
"connector_definition_id",
|
||||
"connector_version",
|
||||
"number_of_connections",
|
||||
"number_of_users",
|
||||
"succeeded_syncs_count",
|
||||
"failed_syncs_count",
|
||||
"total_syncs_count",
|
||||
"sync_success_rate",
|
||||
}
|
||||
|
||||
expected_sql_query = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text()
|
||||
expected_project_id = "airbyte-data-prod"
|
||||
adoption_metrics_per_connector_version = inputs.fetch_adoption_metrics_per_connector_version()
|
||||
assert len(adoption_metrics_per_connector_version) == 0
|
||||
assert isinstance(adoption_metrics_per_connector_version, pd.DataFrame)
|
||||
assert set(adoption_metrics_per_connector_version.columns) == expected_columns
|
||||
inputs.service_account.Credentials.from_service_account_info.assert_called_with(
|
||||
{"type": "fake_service_account"}
|
||||
)
|
||||
inputs.pd.read_gbq.assert_called_with(
|
||||
expected_sql_query,
|
||||
project_id=expected_project_id,
|
||||
credentials=inputs.service_account.Credentials.from_service_account_info.return_value
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user