1
0
mirror of synced 2025-12-25 02:09:19 -05:00

connector-ci: better and lighter logging (#27606)

This commit is contained in:
Augustin
2023-06-24 11:17:46 +02:00
committed by GitHub
parent f1f088d3c6
commit d3c33a6321
18 changed files with 288 additions and 182 deletions

View File

@@ -124,3 +124,4 @@ runs:
SPEC_CACHE_GCS_CREDENTIALS: ${{ inputs.spec_cache_gcs_credentials }}
DOCKER_HUB_USERNAME: ${{ inputs.docker_hub_username }}
DOCKER_HUB_PASSWORD: ${{ inputs.docker_hub_password }}
CI: "True"

View File

@@ -62,16 +62,16 @@ N.B: This project will eventually be moved to `airbyte-ci` root directory.
#### Options
| Option | Default value | Mapped environment variable | Description |
| ---------------------------- | ------------------------------- | ----------------------------- | ------------------------------------------------------------------------------------------- |
| `--is-local/--is-ci` | `--is-local` | | Determines the environment in which the CLI runs: local environment or CI environment. |
| `--git-branch` | The checked out git branch name | `CI_GIT_BRANCH` | The git branch on which the pipelines will run. |
| `--git-revision` | The current branch head | `CI_GIT_REVISION` | The commit hash on which the pipelines will run. |
| `--diffed-branch` | `origin/master` | | Branch to which the git diff will happen to detect new or modified files. |
| `--gha-workflow-run-id` | | | GHA CI only - The run id of the GitHub action workflow |
| `--ci-context` | `manual` | | The current CI context: `manual` for manual run, `pull_request`, `nightly_builds`, `master` |
| `--pipeline-start-timestamp` | Current epoch time | `CI_PIPELINE_START_TIMESTAMP` | Start time of the pipeline as epoch time. Used for pipeline run duration computation. |
| Option | Default value | Mapped environment variable | Description |
| --------------------------------------- | ------------------------------- | ----------------------------- | ------------------------------------------------------------------------------------------- |
| `--is-local/--is-ci` | `--is-local` | | Determines the environment in which the CLI runs: local environment or CI environment. |
| `--git-branch` | The checked out git branch name | `CI_GIT_BRANCH` | The git branch on which the pipelines will run. |
| `--git-revision` | The current branch head | `CI_GIT_REVISION` | The commit hash on which the pipelines will run. |
| `--diffed-branch` | `origin/master` | | Branch to which the git diff will happen to detect new or modified files. |
| `--gha-workflow-run-id` | | | GHA CI only - The run id of the GitHub action workflow |
| `--ci-context` | `manual` | | The current CI context: `manual` for manual run, `pull_request`, `nightly_builds`, `master` |
| `--pipeline-start-timestamp` | Current epoch time | `CI_PIPELINE_START_TIMESTAMP` | Start time of the pipeline as epoch time. Used for pipeline run duration computation. |
| `--show-dagger-logs/--hide-dagger-logs` | `--hide-dagger-logs` | | Flag to show or hide the dagger logs. |
### <a id="connectors-command-subgroup"></a>`connectors` command subgroup
@@ -248,17 +248,17 @@ Publish all connectors modified in the head commit: `airbyte-ci connectors --mod
### Options
| Option | Required | Default | Mapped environment variable | Description |
| ------------------------------------ | -------- | --------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Option | Required | Default | Mapped environment variable | Description |
| ------------------------------------ | -------- | --------------- | ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--pre-release/--main-release` | False | `--pre-release` | | Whether to publish the pre-release or the main release version of a connector. Defaults to pre-release. For main release you have to set the credentials to interact with the GCS bucket. |
| `--docker-hub-username` | True | | `DOCKER_HUB_USERNAME` | Your username to connect to DockerHub. |
| `--docker-hub-password` | True | | `DOCKER_HUB_PASSWORD` | Your password to connect to DockerHub. |
| `--spec-cache-gcs-credentials` | False | | `SPEC_CACHE_GCS_CREDENTIALS` | The service account key to upload files to the GCS bucket hosting spec cache. |
| `--spec-cache-bucket-name` | False | | `SPEC_CACHE_BUCKET_NAME` | The name of the GCS bucket where specs will be cached. |
| `--metadata-service-gcs-credentials` | False | | `METADATA_SERVICE_GCS_CREDENTIALS` | The service account key to upload files to the GCS bucket hosting the metadata files. |
| `--metadata-service-bucket-name` | False | | `METADATA_SERVICE_BUCKET_NAME` | The name of the GCS bucket where metadata files will be uploaded. |
| `--slack-webhook` | False | | `SLACK_WEBHOOK` | The Slack webhook URL to send notifications to. |
| `--slack-channel` | False | | `SLACK_CHANNEL` | The Slack channel name to send notifications to. |
| `--docker-hub-username` | True | | `DOCKER_HUB_USERNAME` | Your username to connect to DockerHub. |
| `--docker-hub-password` | True | | `DOCKER_HUB_PASSWORD` | Your password to connect to DockerHub. |
| `--spec-cache-gcs-credentials` | False | | `SPEC_CACHE_GCS_CREDENTIALS` | The service account key to upload files to the GCS bucket hosting spec cache. |
| `--spec-cache-bucket-name` | False | | `SPEC_CACHE_BUCKET_NAME` | The name of the GCS bucket where specs will be cached. |
| `--metadata-service-gcs-credentials` | False | | `METADATA_SERVICE_GCS_CREDENTIALS` | The service account key to upload files to the GCS bucket hosting the metadata files. |
| `--metadata-service-bucket-name` | False | | `METADATA_SERVICE_BUCKET_NAME` | The name of the GCS bucket where metadata files will be uploaded. |
| `--slack-webhook` | False | | `SLACK_WEBHOOK` | The Slack webhook URL to send notifications to. |
| `--slack-channel` | False | | `SLACK_CHANNEL` | The Slack channel name to send notifications to. |
I've added an empty "Default" column, and you can fill in the default values as needed.
#### What it runs

View File

@@ -4,7 +4,18 @@
"""The pipelines package."""
import logging
import os
from rich.logging import RichHandler
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging_handlers = [RichHandler(rich_tracebacks=True)]
if "CI" in os.environ:
# RichHandler does not work great in the CI
logging_handlers = [logging.StreamHandler()]
logging.basicConfig(level=logging.INFO, format="%(name)s: %(message)s", datefmt="[%X]", handlers=logging_handlers)
main_logger = logging.getLogger(__name__)

View File

@@ -466,12 +466,10 @@ async def load_image_to_docker_host(context: ConnectorContext, tar_file: File, i
docker_cli = with_docker_cli(context).with_mounted_file(tar_name, tar_file)
image_load_output = await docker_cli.with_exec(["docker", "load", "--input", tar_name]).stdout()
context.logger.info(image_load_output)
# Not tagged images only have a sha256 id the load output shares.
if "sha256:" in image_load_output:
image_id = image_load_output.replace("\n", "").replace("Loaded image ID: sha256:", "")
docker_tag_output = await docker_cli.with_exec(["docker", "tag", image_id, image_tag]).stdout()
context.logger.info(docker_tag_output)
await docker_cli.with_exec(["docker", "tag", image_id, image_tag]).exit_code()
def with_poetry(context: PipelineContext) -> Container:

View File

@@ -7,10 +7,11 @@
from __future__ import annotations
import json
import logging
import webbrowser
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING, Any, ClassVar, List, Optional
@@ -19,7 +20,7 @@ import asyncer
from anyio import Path
from ci_connector_ops.pipelines.actions import remote_storage
from ci_connector_ops.pipelines.consts import LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from ci_connector_ops.pipelines.utils import check_path_in_workdir, slugify, with_exit_code, with_stderr, with_stdout
from ci_connector_ops.pipelines.utils import check_path_in_workdir, format_duration, slugify, with_exit_code, with_stderr, with_stdout
from ci_connector_ops.utils import console
from dagger import Container, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
@@ -99,12 +100,43 @@ class Step(ABC):
"""An abstract class to declare and run pipeline step."""
title: ClassVar[str]
started_at: ClassVar[datetime]
max_retries: ClassVar[int] = 0
should_log: ClassVar[bool] = True
def __init__(self, context: PipelineContext) -> None: # noqa D107
self.context = context
self.retry_count = 0
self.started_at = None
self.stopped_at = None
@property
def run_duration(self) -> timedelta:
if self.started_at and self.stopped_at:
return self.stopped_at - self.started_at
else:
return timedelta(seconds=0)
@property
def logger(self) -> logging.Logger:
if self.should_log:
return self.context.logger
else:
disabled_logger = logging.getLogger()
disabled_logger.disabled = True
return disabled_logger
async def log_progress(self, completion_event) -> None:
while not completion_event.is_set():
duration = datetime.utcnow() - self.started_at
elapsed_seconds = duration.total_seconds()
if elapsed_seconds > 30 and round(elapsed_seconds) % 30 == 0:
self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})")
await anyio.sleep(1)
async def run_with_completion(self, completion_event, *args, **kwargs) -> StepResult:
result = await self._run(*args, **kwargs)
completion_event.set()
return result
async def run(self, *args, **kwargs) -> StepResult:
"""Public method to run the step. It output a step result.
@@ -114,20 +146,43 @@ class Step(ABC):
Returns:
StepResult: The step result following the step run.
"""
self.started_at = datetime.utcnow()
try:
result = await self._run(*args, **kwargs)
self.started_at = datetime.utcnow()
self.logger.info(f"🚀 Start {self.title}")
completion_event = anyio.Event()
async with asyncer.create_task_group() as task_group:
soon_result = task_group.soonify(self.run_with_completion)(completion_event, *args, **kwargs)
task_group.soonify(self.log_progress)(completion_event)
result = soon_result.value
if result.status is StepStatus.FAILURE and self.retry_count <= self.max_retries and self.max_retries > 0:
self.retry_count += 1
await anyio.sleep(10)
self.context.logger.warn(
f"Retry #{self.retry_count} for {self.title} step on connector {self.context.connector.technical_name}"
)
self.logger.warn(f"Retry #{self.retry_count} for {self.title} step on connector {self.context.connector.technical_name}.")
return await self.run(*args, **kwargs)
self.stopped_at = datetime.utcnow()
self.log_step_result(result)
return result
except QueryError as e:
self.stopped_at = datetime.utcnow()
self.logger.error(f"QueryError on step {self.title}: {e}")
return StepResult(self, StepStatus.FAILURE, stderr=str(e))
def log_step_result(self, result: StepResult) -> None:
"""Log the step result.
Args:
result (StepResult): The step result to log.
"""
duration = format_duration(self.run_duration)
if result.status is StepStatus.FAILURE:
self.logger.error(f"{result.status.get_emoji()} {self.title} failed (duration: {duration})")
if result.status is StepStatus.SKIPPED:
self.logger.info(f"{result.status.get_emoji()} {self.title} was skipped (duration: {duration})")
if result.status is StepStatus.SUCCESS:
self.logger.info(f"{result.status.get_emoji()} {self.title} was successful (duration: {duration})")
@abstractmethod
async def _run(self, *args, **kwargs) -> StepResult:
"""Implement the execution of the step and return a step result.
@@ -179,8 +234,9 @@ class PytestStep(Step, ABC):
"""Return the path to the pytest log file."""
log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs")
await log_directory.mkdir(exist_ok=True)
await Path(f"{log_directory}/{slugify(self.title).replace('-', '_')}.log").write_text(logs)
self.context.logger.info(f"Pytest logs written to {log_directory}/{slugify(self.title)}.log")
log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}.log").resolve()
await log_path.write_text(logs)
self.logger.info(f"Pytest logs written to {log_path}")
# TODO this is not very robust if pytest crashes and does not outputs its expected last log line.
def pytest_logs_to_step_result(self, logs: str) -> StepResult:
@@ -240,6 +296,7 @@ class NoOpStep(Step):
"""A step that does nothing."""
title = "No Op"
should_log = False
def __init__(self, context: PipelineContext, step_status: StepStatus) -> None:
super().__init__(context)
@@ -346,17 +403,21 @@ class Report:
gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret,
flags=gcs_cp_flags,
)
gcs_uri = "gs://" + self.pipeline_context.ci_report_bucket + "/" + remote_key
public_url = f"https://storage.googleapis.com/{self.pipeline_context.ci_report_bucket}/{remote_key}"
if report_upload_exit_code != 0:
self.pipeline_context.logger.error(f"Uploading {local_path} to GCS Bucket: {self.pipeline_context.ci_report_bucket} failed.")
self.pipeline_context.logger.error(f"Uploading {local_path} to {gcs_uri} failed.")
else:
self.pipeline_context.logger.info(f"Uploading {local_path} to {gcs_uri} succeeded. Public URL: {public_url}")
return report_upload_exit_code
async def save(self) -> None:
"""Save the report files."""
local_json_path = await self.save_local(self.json_report_file_name, self.to_json())
self.pipeline_context.logger.info(f"Report saved locally at {local_json_path}")
absolute_path = await local_json_path.absolute()
self.pipeline_context.logger.info(f"Report saved locally at {absolute_path}")
if self.remote_storage_enabled:
await self.save_remote(local_json_path, self.json_report_remote_storage_key, "application/json")
self.pipeline_context.logger.info(f"Report saved remotely at {self.json_report_remote_storage_key}")
def to_json(self) -> str:
"""Create a JSON representation of the report.
@@ -504,6 +565,7 @@ class ConnectorReport(Report):
)
template = env.get_template("test_report.html.j2")
template.globals["StepStatus"] = StepStatus
template.globals["format_duration"] = format_duration
local_icon_path = await Path(f"{self.pipeline_context.connector.code_directory}/icon.svg").resolve()
template_context = {
"connector_name": self.pipeline_context.connector.technical_name,
@@ -528,9 +590,10 @@ class ConnectorReport(Report):
async def save(self) -> None:
local_html_path = await self.save_local(self.html_report_file_name, await self.to_html())
absolute_path = await local_html_path.resolve()
if self.pipeline_context.is_local:
absolute_path = await local_html_path.resolve()
self.pipeline_context.logger.info(f"Opening HTML report in browser: {absolute_path}")
self.pipeline_context.logger.info(f"HTML report saved locally: {absolute_path}")
self.pipeline_context.logger.info("Opening HTML report in browser.")
webbrowser.open(absolute_path.as_uri())
if self.remote_storage_enabled:
await self.save_remote(local_html_path, self.html_report_remote_storage_key, "text/html")
@@ -546,26 +609,17 @@ class ConnectorReport(Report):
step_results_table = Table(title="Steps results")
step_results_table.add_column("Step")
step_results_table.add_column("Result")
step_results_table.add_column("Finished after")
step_results_table.add_column("Duration")
for step_result in self.steps_results:
step = Text(step_result.step.title)
step.stylize(step_result.status.get_rich_style())
result = Text(step_result.status.value)
result.stylize(step_result.status.get_rich_style())
step_results_table.add_row(step, result, f"{round((self.created_at - step_result.created_at).total_seconds())}s")
step_results_table.add_row(step, result, format_duration(step_result.step.run_duration))
to_render = [step_results_table]
if self.failed_steps:
sub_panels = []
for failed_step in self.failed_steps:
errors = Text(failed_step.stderr)
panel_title = Text(f"{connector_name} {failed_step.step.title.lower()} failures")
panel_title.stylize(Style(color="red", bold=True))
sub_panel = Panel(errors, title=panel_title)
sub_panels.append(sub_panel)
failures_group = Group(*sub_panels)
to_render.append(failures_group)
details_instructions = Text(" You can find more details with step executions logs in the saved HTML report.")
to_render = [step_results_table, details_instructions]
main_panel = Panel(Group(*to_render), title=main_panel_title, subtitle=duration_subtitle)
console.print(main_panel)

View File

@@ -7,7 +7,7 @@
from typing import List
import click
from ci_connector_ops.pipelines import github
from ci_connector_ops.pipelines import github, main_logger
from ci_connector_ops.pipelines.bases import CIContext
from ci_connector_ops.pipelines.utils import (
get_current_epoch_time,
@@ -69,7 +69,15 @@ def get_modified_files(
@click.option("--ci-git-user", default="octavia-squidington-iii", envvar="CI_GIT_USER", type=str)
@click.option("--ci-github-access-token", envvar="CI_GITHUB_ACCESS_TOKEN", type=str)
@click.option("--ci-report-bucket-name", envvar="CI_REPORT_BUCKET_NAME", type=str)
@click.option(
"--ci-gcs-credentials",
help="The service account to use during CI.",
type=click.STRING,
required=False, # Not required for pre-release or local pipelines
envvar="GCP_GSM_CREDENTIALS",
)
@click.option("--ci-job-key", envvar="CI_JOB_KEY", type=str)
@click.option("--show-dagger-logs/--hide-dagger-logs", default=False, type=bool)
@click.pass_context
def airbyte_ci(
ctx: click.Context,
@@ -84,7 +92,9 @@ def airbyte_ci(
ci_git_user: str,
ci_github_access_token: str,
ci_report_bucket_name: str,
ci_gcs_credentials: str,
ci_job_key: str,
show_dagger_logs: bool,
): # noqa D103
ctx.ensure_object(dict)
ctx.obj["is_local"] = is_local
@@ -97,10 +107,12 @@ def airbyte_ci(
)
ctx.obj["ci_context"] = ci_context
ctx.obj["ci_report_bucket_name"] = ci_report_bucket_name
ctx.obj["ci_gcs_credentials"] = ci_gcs_credentials
ctx.obj["ci_git_user"] = ci_git_user
ctx.obj["ci_github_access_token"] = ci_github_access_token
ctx.obj["ci_job_key"] = ci_job_key
ctx.obj["pipeline_start_timestamp"] = pipeline_start_timestamp
ctx.obj["show_dagger_logs"] = show_dagger_logs
if pull_request_number and ci_github_access_token:
ctx.obj["pull_request"] = github.get_pull_request(pull_request_number, ci_github_access_token)
@@ -110,16 +122,16 @@ def airbyte_ci(
ctx.obj["modified_files"] = get_modified_files(git_branch, git_revision, diffed_branch, is_local, ci_context, ctx.obj["pull_request"])
if not is_local:
click.echo("Running airbyte-ci in CI mode.")
click.echo(f"CI Context: {ci_context}")
click.echo(f"CI Report Bucket Name: {ci_report_bucket_name}")
click.echo(f"Git Branch: {git_branch}")
click.echo(f"Git Revision: {git_revision}")
click.echo(f"GitHub Workflow Run ID: {gha_workflow_run_id}")
click.echo(f"GitHub Workflow Run URL: {ctx.obj['gha_workflow_run_url']}")
click.echo(f"Pull Request Number: {pull_request_number}")
click.echo(f"Pipeline Start Timestamp: {pipeline_start_timestamp}")
click.echo(f"Modified Files: {ctx.obj['modified_files']}")
main_logger.info("Running airbyte-ci in CI mode.")
main_logger.info(f"CI Context: {ci_context}")
main_logger.info(f"CI Report Bucket Name: {ci_report_bucket_name}")
main_logger.info(f"Git Branch: {git_branch}")
main_logger.info(f"Git Revision: {git_revision}")
main_logger.info(f"GitHub Workflow Run ID: {gha_workflow_run_id}")
main_logger.info(f"GitHub Workflow Run URL: {ctx.obj['gha_workflow_run_url']}")
main_logger.info(f"Pull Request Number: {pull_request_number}")
main_logger.info(f"Pipeline Start Timestamp: {pipeline_start_timestamp}")
main_logger.info(f"Modified Files: {ctx.obj['modified_files']}")
airbyte_ci.add_command(connectors)

View File

@@ -4,7 +4,6 @@
"""This module declares the CLI commands to run the connectors CI pipelines."""
import logging
import os
import sys
from pathlib import Path
@@ -12,6 +11,7 @@ from typing import Any, Dict, Tuple
import anyio
import click
from ci_connector_ops.pipelines import main_logger
from ci_connector_ops.pipelines.builds import run_connector_build_pipeline
from ci_connector_ops.pipelines.contexts import ConnectorContext, ContextState, PublishConnectorContext
from ci_connector_ops.pipelines.format import run_connectors_format_pipelines
@@ -19,17 +19,11 @@ from ci_connector_ops.pipelines.github import update_global_commit_status_check_
from ci_connector_ops.pipelines.pipelines.connectors import run_connectors_pipelines
from ci_connector_ops.pipelines.publish import reorder_contexts, run_connector_publish_pipeline
from ci_connector_ops.pipelines.tests import run_connector_test_pipeline
from ci_connector_ops.pipelines.utils import DaggerPipelineCommand, get_modified_connectors, get_modified_metadata_files, slugify
from ci_connector_ops.pipelines.utils import DaggerPipelineCommand, get_modified_connectors, get_modified_metadata_files
from ci_connector_ops.utils import ConnectorLanguage, console, get_all_released_connectors
from rich.logging import RichHandler
from rich.table import Table
from rich.text import Text
logging.basicConfig(level=logging.INFO, format="%(name)s: %(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)])
logger = logging.getLogger(__name__)
# HELPERS
@@ -53,57 +47,11 @@ def validate_environment(is_local: bool, use_remote_secrets: bool):
)
def render_report_output_prefix(ctx: click.Context) -> str:
"""Render the report output prefix for any command in the Connector CLI.
The goal is to standardize the output of all logs and reports generated by the CLI
related to a specific command, and to a specific CI context.
Note: We cannot hoist this higher in the command hierarchy because only one level of
subcommands are available at the time the context is created.
"""
git_branch = ctx.obj["git_branch"]
git_revision = ctx.obj["git_revision"]
pipeline_start_timestamp = ctx.obj["pipeline_start_timestamp"]
ci_context = ctx.obj["ci_context"]
ci_job_key = ctx.obj["ci_job_key"] if ctx.obj.get("ci_job_key") else ci_context
sanitized_branch = slugify(git_branch.replace("/", "_"))
# get the command name for the current context, if a group then prepend the parent command name
invoked_subcommand = ctx.invoked_subcommand
parent_command_path = ctx.command_path.replace(" ", "/") if ctx.command_path else None
cmd = f"{parent_command_path}/{invoked_subcommand}" if parent_command_path else invoked_subcommand
path_values = [
cmd,
ci_job_key,
sanitized_branch,
pipeline_start_timestamp,
git_revision,
]
# check all values are defined
if None in path_values:
raise ValueError(f"Missing value required to render the report output prefix: {path_values}")
# join all values with a slash, and convert all values to string
return "/".join(map(str, path_values))
# COMMANDS
@click.group(help="Commands related to connectors and connector acceptance tests.")
@click.option("--use-remote-secrets", default=True) # specific to connectors
@click.option(
"--ci-gcs-credentials",
help="The service account to use during CI.",
type=click.STRING,
required=False, # Not required for pre-release or local pipelines
envvar="GCP_GSM_CREDENTIALS",
)
@click.option(
"--name", "names", multiple=True, help="Only test a specific connector. Use its technical name. e.g source-pokeapi.", type=str
)
@@ -127,7 +75,6 @@ def render_report_output_prefix(ctx: click.Context) -> str:
def connectors(
ctx: click.Context,
use_remote_secrets: bool,
ci_gcs_credentials: str,
names: Tuple[str],
languages: Tuple[ConnectorLanguage],
release_stages: Tuple[str],
@@ -140,14 +87,12 @@ def connectors(
ctx.ensure_object(dict)
ctx.obj["use_remote_secrets"] = use_remote_secrets
ctx.obj["ci_gcs_credentials"] = ci_gcs_credentials
ctx.obj["connector_names"] = names
ctx.obj["connector_languages"] = languages
ctx.obj["release_states"] = release_stages
ctx.obj["modified"] = modified
ctx.obj["concurrency"] = concurrency
ctx.obj["execute_timeout"] = execute_timeout
ctx.obj["report_output_prefix"] = render_report_output_prefix(ctx)
all_connectors = get_all_released_connectors()
@@ -193,14 +138,14 @@ def test(
ctx (click.Context): The click context.
"""
if ctx.obj["is_ci"] and ctx.obj["pull_request"] and ctx.obj["pull_request"].draft:
click.echo("Skipping connectors tests for draft pull request.")
main_logger.info("Skipping connectors tests for draft pull request.")
sys.exit(0)
click.secho(f"Will run the test pipeline for the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}.", fg="green")
main_logger.info(f"Will run the test pipeline for the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}")
if ctx.obj["selected_connectors_and_files"]:
update_global_commit_status_check_for_tests(ctx.obj, "pending")
else:
click.secho("No connector were selected for testing.", fg="yellow")
main_logger.warn("No connector were selected for testing.")
update_global_commit_status_check_for_tests(ctx.obj, "success")
return True
@@ -220,7 +165,6 @@ def test(
ci_context=ctx.obj.get("ci_context"),
pull_request=ctx.obj.get("pull_request"),
ci_gcs_credentials=ctx.obj["ci_gcs_credentials"],
should_save_report=True,
)
for connector, modified_files in ctx.obj["selected_connectors_and_files"].items()
]
@@ -231,15 +175,20 @@ def test(
run_connector_test_pipeline,
"Test Pipeline",
ctx.obj["concurrency"],
ctx.obj["dagger_logs_path"],
ctx.obj["execute_timeout"],
)
except Exception as e:
click.secho(str(e), err=True, fg="red")
main_logger.error("An error occurred while running the test pipeline", exc_info=e)
update_global_commit_status_check_for_tests(ctx.obj, "failure")
return False
global_success = all(connector_context.state is ContextState.SUCCESSFUL for connector_context in connectors_tests_contexts)
update_global_commit_status_check_for_tests(ctx.obj, "success" if global_success else "failure")
@ctx.call_on_close
def send_commit_status_check() -> None:
if ctx.obj["is_ci"]:
global_success = all(connector_context.state is ContextState.SUCCESSFUL for connector_context in connectors_tests_contexts)
update_global_commit_status_check_for_tests(ctx.obj, "success" if global_success else "failure")
# If we reach this point, it means that all the connectors have been tested so the pipeline did its job and can exit with success.
return True
@@ -247,10 +196,10 @@ def test(
@connectors.command(cls=DaggerPipelineCommand, help="Build all images for the selected connectors.")
@click.pass_context
def build(ctx: click.Context) -> bool:
click.secho(f"Will build the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}.", fg="green")
main_logger.info(f"Will build the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}.")
connectors_contexts = [
ConnectorContext(
pipeline_name="Build connector {connector.technical_name}",
pipeline_name=f"Build connector {connector.technical_name}",
connector=connector,
is_local=ctx.obj["is_local"],
git_branch=ctx.obj["git_branch"],
@@ -272,6 +221,7 @@ def build(ctx: click.Context) -> bool:
run_connector_build_pipeline,
"Build Pipeline",
ctx.obj["concurrency"],
ctx.obj["dagger_logs_path"],
ctx.obj["execute_timeout"],
)
@@ -366,8 +316,7 @@ def publish(
selected_connectors_and_files = ctx.obj["selected_connectors_and_files"]
selected_connectors_names = ctx.obj["selected_connectors_names"]
click.secho(f"Will publish the following connectors: {', '.join(selected_connectors_names)}.", fg="green")
main_logger.info(f"Will publish the following connectors: {', '.join(selected_connectors_names)}")
publish_connector_contexts = reorder_contexts(
[
PublishConnectorContext(
@@ -397,14 +346,16 @@ def publish(
]
)
click.secho("Concurrency is forced to 1. For stability reasons we disable parallel publish pipelines.", fg="yellow")
main_logger.warn("Concurrency is forced to 1. For stability reasons we disable parallel publish pipelines.")
ctx.obj["concurrency"] = 1
publish_connector_contexts = anyio.run(
run_connectors_pipelines,
publish_connector_contexts,
run_connector_publish_pipeline,
"Publish pipeline",
"Publishing connectors",
ctx.obj["concurrency"],
ctx.obj["dagger_logs_path"],
ctx.obj["execute_timeout"],
)
return all(context.state is ContextState.SUCCESSFUL for context in publish_connector_contexts)
@@ -466,13 +417,11 @@ def format(ctx: click.Context) -> bool:
]
if connectors_and_files_to_format:
click.secho(
f"Will format the following connectors: {', '.join([connector.technical_name for connector, _ in connectors_and_files_to_format])}.",
fg="green",
main_logger.info(
f"Will format the following connectors: {', '.join([connector.technical_name for connector, _ in connectors_and_files_to_format])}."
)
else:
click.secho("No connectors to format.", fg="yellow")
main_logger.info("No connectors to format.")
connectors_contexts = [
ConnectorContext(
pipeline_name=f"Format connector {connector.technical_name}",

View File

@@ -1,7 +1,6 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
import anyio
import click
@@ -16,14 +15,9 @@ from ci_connector_ops.pipelines.pipelines.metadata import (
from ci_connector_ops.pipelines.utils import (
DaggerPipelineCommand,
get_all_metadata_files,
get_modified_metadata_files,
get_expected_metadata_files,
get_modified_metadata_files,
)
from rich.logging import RichHandler
logging.basicConfig(level=logging.INFO, format="%(name)s: %(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)])
logger = logging.getLogger(__name__)
# MAIN GROUP

View File

@@ -3,6 +3,7 @@
#
import platform
from pathlib import Path
from dagger import Platform
@@ -33,3 +34,4 @@ GRADLE_CACHE_PATH = "/root/.gradle/caches"
GRADLE_BUILD_CACHE_PATH = f"{GRADLE_CACHE_PATH}/build-cache-1"
GRADLE_READ_ONLY_DEPENDENCY_CACHE_PATH = "/root/gradle_dependency_cache"
LOCAL_REPORTS_PATH_ROOT = "tools/ci_connector_ops/pipeline_reports/"
Path(LOCAL_REPORTS_PATH_ROOT).mkdir(parents=True, exist_ok=True)

View File

@@ -297,7 +297,7 @@ class ConnectorContext(PipelineContext):
slack_webhook: Optional[str] = None,
reporting_slack_channel: Optional[str] = None,
pull_request: PullRequest = None,
should_save_report: bool = False,
should_save_report: bool = True,
):
"""Initialize a connector context.

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import os
from typing import TYPE_CHECKING, Optional
from ci_connector_ops.pipelines import main_logger
from ci_connector_ops.pipelines.bases import CIContext
from ci_connector_ops.utils import console
@@ -28,7 +29,7 @@ def safe_log(logger: Optional[Logger], message: str, level: str = "info") -> Non
log_method = getattr(logger, level.lower())
log_method(message)
else:
console.print(message)
main_logger.info(message)
def update_commit_status_check(

View File

@@ -4,6 +4,7 @@
"""This module groups the functions to run full pipelines for connector testing."""
import sys
from pathlib import Path
from typing import Callable, List, Optional
import anyio
@@ -17,7 +18,6 @@ from dagger import Config
GITHUB_GLOBAL_CONTEXT = "[POC please ignore] Connectors CI"
GITHUB_GLOBAL_DESCRIPTION = "Running connectors tests"
CONNECTOR_LANGUAGE_TO_FORCED_CONCURRENCY_MAPPING = {
# We run the Java connectors tests sequentially because we currently have memory issues when Java integration tests are run in parallel.
# See https://github.com/airbytehq/airbyte/issues/27168
@@ -73,13 +73,15 @@ async def run_connectors_pipelines(
connector_pipeline: Callable,
pipeline_name: str,
concurrency: int,
dagger_logs_path: Optional[Path],
execute_timeout: Optional[int],
*args,
) -> List[ConnectorContext]:
"""Run a connector pipeline for all the connector contexts."""
default_connectors_semaphore = anyio.Semaphore(concurrency)
async with dagger.Connection(Config(log_output=sys.stderr, execute_timeout=execute_timeout)) as dagger_client:
dagger_logs_output = sys.stderr if not dagger_logs_path else open(dagger_logs_path, "w")
async with dagger.Connection(Config(log_output=dagger_logs_output, execute_timeout=execute_timeout)) as dagger_client:
# HACK: This is to get a long running dockerd service to be shared across all the connectors pipelines
# Using the "normal" service binding leads to restart of dockerd during pipeline run that can cause corrupted docker state
# See https://github.com/airbytehq/airbyte/issues/27233

View File

@@ -37,7 +37,6 @@ async def run_metadata_validation(context: ConnectorContext) -> List[StepResult]
Returns:
List[StepResult]: The results of the metadata validation steps.
"""
context.logger.info("Run metadata validation.")
return [await MetadataValidation(context, context.connector.code_directory / METADATA_FILE_NAME).run()]
@@ -50,7 +49,6 @@ async def run_version_checks(context: ConnectorContext) -> List[StepResult]:
Returns:
List[StepResult]: The results of the version checks steps.
"""
context.logger.info("Run version checks.")
return [await VersionFollowsSemverCheck(context).run(), await VersionIncrementCheck(context).run()]
@@ -63,7 +61,6 @@ async def run_qa_checks(context: ConnectorContext) -> List[StepResult]:
Returns:
List[StepResult]: The results of the QA checks steps.
"""
context.logger.info("Run QA checks.")
return [await QaChecks(context).run()]
@@ -77,7 +74,6 @@ async def run_code_format_checks(context: ConnectorContext) -> List[StepResult]:
List[StepResult]: The results of the code format checks steps.
"""
if _run_code_format_checks := LANGUAGE_MAPPING["run_code_format_checks"].get(context.connector.language):
context.logger.info("Run code format checks.")
return await _run_code_format_checks(context)
else:
context.logger.warning(f"No code format checks defined for connector language {context.connector.language}!")

View File

@@ -75,7 +75,7 @@ class VersionCheck(Step, ABC):
class VersionIncrementCheck(VersionCheck):
title = "Connector version increment check."
title = "Connector version increment check"
BYPASS_CHECK_FOR = [
METADATA_FILE_NAME,
@@ -110,7 +110,7 @@ class VersionIncrementCheck(VersionCheck):
class VersionFollowsSemverCheck(VersionCheck):
title = "Connector version semver check."
title = "Connector version semver check"
@property
def failure_message(self) -> str:

View File

@@ -128,13 +128,15 @@
<h2>Summary</h2>
<table>
<tr>
<th>Step Name</th>
<th>Step</th>
<th>Status</th>
<th>Duration</th>
</tr>
{% for step_result in step_results %}
<tr>
<td>{{ step_result.step.title }}</td>
<td>{{ step_result.status }}</td>
<td>{{ format_duration(step_result.step.run_duration) }}</td>
</tr>
{% endfor %}
</table>
@@ -143,11 +145,11 @@
<div class="wrap-collabsible">
<input id="{{ step_result.step.title }}" class="toggle" type="checkbox">
{% if step_result.status == StepStatus.SUCCESS %}
<label for="{{ step_result.step.title }}" class="lbl-toggle success">{{ step_result.step.title }}</label>
<label for="{{ step_result.step.title }}" class="lbl-toggle success">{{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% elif step_result.status == StepStatus.FAILURE %}
<label for="{{ step_result.step.title }}" class="lbl-toggle failure">{{ step_result.step.title }}</label>
<label for="{{ step_result.step.title }}" class="lbl-toggle failure">{{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% else %}
<label for="{{ step_result.step.title }}" class="lbl-toggle">{{ step_result.step.title }}</label>
<label for="{{ step_result.step.title }}" class="lbl-toggle">{{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% endif %}
<div class="collapsible-content">
<div class="content-inner">

View File

@@ -18,8 +18,11 @@ import anyio
import asyncer
import click
import git
from ci_connector_ops.pipelines import consts, main_logger
from ci_connector_ops.utils import get_all_released_connectors, get_changed_connectors
from dagger import Config, Connection, Container, DaggerError, File, ImageLayerCompression, QueryError
from google.cloud import storage
from google.oauth2 import service_account
from more_itertools import chunked
if TYPE_CHECKING:
@@ -339,17 +342,74 @@ class DaggerPipelineCommand(click.Command):
Any: The invocation return value.
"""
command_name = self.name
click.secho(f"Running Dagger Command {command_name}...")
click.secho(
main_logger.info(f"Running Dagger Command {command_name}...")
main_logger.info(
"If you're running this command for the first time the Dagger engine image will be pulled, it can take a short minute..."
)
try:
ctx.obj["report_output_prefix"] = self.render_report_output_prefix(ctx)
if not ctx.obj["show_dagger_logs"]:
dagger_log_dir = Path(f"{consts.LOCAL_REPORTS_PATH_ROOT}/{ctx.obj['report_output_prefix']}")
dagger_log_dir.mkdir(parents=True, exist_ok=True)
dagger_log_path = Path(f"{dagger_log_dir}/dagger.log").resolve()
dagger_log_path.touch()
ctx.obj["dagger_logs_path"] = dagger_log_path
main_logger.info(f"Saving dagger logs to: {dagger_log_path}")
else:
ctx.obj["dagger_logs_path"] = None
pipeline_success = super().invoke(ctx)
if not pipeline_success:
raise DaggerError(f"Dagger Command {command_name} failed.")
except DaggerError as e:
click.secho(str(e), err=True, fg="red")
main_logger.error(f"Dagger Command {command_name} failed", exc_info=e)
sys.exit(1)
finally:
if ctx.obj.get("dagger_logs_path"):
if ctx.obj["is_local"]:
main_logger.info(f"Dagger logs saved to {ctx.obj['dagger_logs_path']}")
if ctx.obj["is_ci"]:
dagger_logs_gcs_key = f"{ctx.obj['report_output_prefix']}/dagger-logs.txt"
gcs_uri, public_url = upload_to_gcs(
ctx.obj["dagger_logs_path"], ctx.obj["ci_report_bucket_name"], dagger_logs_gcs_key, ctx.obj["ci_gcs_credentials"]
)
main_logger.info(f"Dagger logs saved to {gcs_uri}. Public URL: {public_url}")
@staticmethod
def render_report_output_prefix(ctx: click.Context) -> str:
"""Render the report output prefix for any command in the Connector CLI.
The goal is to standardize the output of all logs and reports generated by the CLI
related to a specific command, and to a specific CI context.
Note: We cannot hoist this higher in the command hierarchy because only one level of
subcommands are available at the time the context is created.
"""
git_branch = ctx.obj["git_branch"]
git_revision = ctx.obj["git_revision"]
pipeline_start_timestamp = ctx.obj["pipeline_start_timestamp"]
ci_context = ctx.obj["ci_context"]
ci_job_key = ctx.obj["ci_job_key"] if ctx.obj.get("ci_job_key") else ci_context
sanitized_branch = slugify(git_branch.replace("/", "_"))
# get the command name for the current context, if a group then prepend the parent command name
cmd = ctx.command_path.replace(" ", "/") if ctx.command_path else None
path_values = [
cmd,
ci_job_key,
sanitized_branch,
pipeline_start_timestamp,
git_revision,
]
# check all values are defined
if None in path_values:
raise ValueError(f"Missing value required to render the report output prefix: {path_values}")
# join all values with a slash, and convert all values to string
return "/".join(map(str, path_values))
async def execute_concurrently(steps: List[Callable], concurrency=5):
@@ -402,3 +462,32 @@ def sanitize_gcs_credentials(raw_value: Optional[str]) -> Optional[str]:
if raw_value is None:
return None
return json.dumps(json.loads(raw_value))
def format_duration(time_delta: datetime.timedelta) -> str:
total_seconds = time_delta.total_seconds()
if total_seconds < 60:
return "{:.2f}s".format(total_seconds)
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
return "{:02d}mn{:02d}s".format(minutes, seconds)
def upload_to_gcs(file_path: Path, bucket_name: str, object_name: str, credentials: str) -> Tuple[str, str]:
"""Upload a file to a GCS bucket.
Args:
file_path (Path): The path to the file to upload.
bucket_name (str): The name of the GCS bucket.
object_name (str): The name of the object in the GCS bucket.
credentials (str): The GCS credentials as a JSON string.
"""
credentials = service_account.Credentials.from_service_account_info(json.loads(credentials))
client = storage.Client(credentials=credentials)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
blob.upload_from_filename(str(file_path))
gcs_uri = f"gs://{bucket_name}/{object_name}"
public_url = f"https://storage.googleapis.com/{bucket_name}/{object_name}"
return gcs_uri, public_url

View File

@@ -15,6 +15,7 @@ MAIN_REQUIREMENTS = [
"PyGithub~=1.58.0",
"rich",
"pydash~=7.0.4",
"google-cloud-storage~=2.8.0",
]

View File

@@ -4,7 +4,7 @@
from unittest import mock
import pytest
from ci_connector_ops.pipelines.commands.groups import connectors
from ci_connector_ops.pipelines import utils
@pytest.mark.parametrize(
@@ -12,8 +12,7 @@ from ci_connector_ops.pipelines.commands.groups import connectors
[
(
mock.MagicMock(
command_path="my_command_path",
invoked_subcommand="my_subcommand",
command_path="my command path",
obj={
"git_branch": "my_branch",
"git_revision": "my_git_revision",
@@ -22,12 +21,11 @@ from ci_connector_ops.pipelines.commands.groups import connectors
"ci_job_key": None,
},
),
"my_command_path/my_subcommand/my_ci_context/my_branch/my_pipeline_start_timestamp/my_git_revision",
"my/command/path/my_ci_context/my_branch/my_pipeline_start_timestamp/my_git_revision",
),
(
mock.MagicMock(
command_path="my_command_path",
invoked_subcommand="my_subcommand",
command_path="my command path",
obj={
"git_branch": "my_branch",
"git_revision": "my_git_revision",
@@ -36,12 +34,11 @@ from ci_connector_ops.pipelines.commands.groups import connectors
"ci_job_key": "my_ci_job_key",
},
),
"my_command_path/my_subcommand/my_ci_job_key/my_branch/my_pipeline_start_timestamp/my_git_revision",
"my/command/path/my_ci_job_key/my_branch/my_pipeline_start_timestamp/my_git_revision",
),
(
mock.MagicMock(
command_path="my_command_path another_command",
invoked_subcommand="my_subcommand",
command_path="my command path",
obj={
"git_branch": "my_branch",
"git_revision": "my_git_revision",
@@ -50,12 +47,11 @@ from ci_connector_ops.pipelines.commands.groups import connectors
"ci_job_key": "my_ci_job_key",
},
),
"my_command_path/another_command/my_subcommand/my_ci_job_key/my_branch/my_pipeline_start_timestamp/my_git_revision",
"my/command/path/my_ci_job_key/my_branch/my_pipeline_start_timestamp/my_git_revision",
),
(
mock.MagicMock(
command_path=None,
invoked_subcommand="my_subcommand",
command_path="my command path",
obj={
"git_branch": "my_branch",
"git_revision": "my_git_revision",
@@ -64,12 +60,11 @@ from ci_connector_ops.pipelines.commands.groups import connectors
"ci_job_key": "my_ci_job_key",
},
),
"my_subcommand/my_ci_job_key/my_branch/my_pipeline_start_timestamp/my_git_revision",
"my/command/path/my_ci_job_key/my_branch/my_pipeline_start_timestamp/my_git_revision",
),
(
mock.MagicMock(
command_path=None,
invoked_subcommand="my_subcommand",
command_path="my command path",
obj={
"git_branch": "my_branch/with/slashes",
"git_revision": "my_git_revision",
@@ -78,12 +73,11 @@ from ci_connector_ops.pipelines.commands.groups import connectors
"ci_job_key": "my_ci_job_key",
},
),
"my_subcommand/my_ci_job_key/my_branch_with_slashes/my_pipeline_start_timestamp/my_git_revision",
"my/command/path/my_ci_job_key/my_branch_with_slashes/my_pipeline_start_timestamp/my_git_revision",
),
(
mock.MagicMock(
command_path=None,
invoked_subcommand="my_subcommand",
command_path="my command path",
obj={
"git_branch": "my_branch/with/slashes#and!special@characters",
"git_revision": "my_git_revision",
@@ -92,9 +86,9 @@ from ci_connector_ops.pipelines.commands.groups import connectors
"ci_job_key": "my_ci_job_key",
},
),
"my_subcommand/my_ci_job_key/my_branch_with_slashesandspecialcharacters/my_pipeline_start_timestamp/my_git_revision",
"my/command/path/my_ci_job_key/my_branch_with_slashesandspecialcharacters/my_pipeline_start_timestamp/my_git_revision",
),
],
)
def test_render_report_output_prefix(ctx, expected):
assert connectors.render_report_output_prefix(ctx) == expected
assert utils.DaggerPipelineCommand.render_report_output_prefix(ctx) == expected