1
0
mirror of synced 2025-12-25 02:09:19 -05:00

airbyte-ci: rework RC promotion (#46675)

This commit is contained in:
Augustin
2024-10-14 18:27:14 +02:00
committed by GitHub
parent 5114bb5db9
commit 19f556f94d
22 changed files with 1434 additions and 1101 deletions

View File

@@ -37,6 +37,7 @@ poetry run pytest
```
## Changelog
- 0.10.0: Add `documentation_file_name` property to `Connector` class.
- 0.9.0: Add components path attribute for manifest-only connectors.
- 0.8.1: Gradle dependency discovery logic supports the Bulk CDK.
- 0.8.0: Add a `sbom_url` property to `Connector`

View File

@@ -301,6 +301,10 @@ class Connector:
return f"./docs/{relative_documentation_path}"
@property
def documentation_file_name(self) -> str:
return self.metadata.get("documentationUrl").split("/")[-1] + ".md"
@property
def documentation_file_path(self) -> Optional[Path]:
return Path(f"{self.relative_documentation_path_str}.md") if self.has_airbyte_docs else None

View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
[[package]]
name = "cachetools"
@@ -709,7 +709,6 @@ optional = false
python-versions = ">=3.9"
files = [
{file = "pandas-2.2.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90c6fca2acf139569e74e8781709dccb6fe25940488755716d1d354d6bc58bce"},
{file = "pandas-2.2.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7adfc142dac335d8c1e0dcbd37eb8617eac386596eb9e1a1b77791cf2498238"},
{file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4abfe0be0d7221be4f12552995e58723c7422c80a659da13ca382697de830c08"},
{file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8635c16bf3d99040fdf3ca3db669a7250ddf49c55dc4aa8fe0ae0fa8d6dcc1f0"},
{file = "pandas-2.2.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:40ae1dffb3967a52203105a077415a86044a2bea011b5f321c6aa64b379a3f51"},
@@ -723,14 +722,12 @@ files = [
{file = "pandas-2.2.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:0cace394b6ea70c01ca1595f839cf193df35d1575986e484ad35c4aeae7266c1"},
{file = "pandas-2.2.2-cp311-cp311-win_amd64.whl", hash = "sha256:873d13d177501a28b2756375d59816c365e42ed8417b41665f346289adc68d24"},
{file = "pandas-2.2.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:9dfde2a0ddef507a631dc9dc4af6a9489d5e2e740e226ad426a05cabfbd7c8ef"},
{file = "pandas-2.2.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e9b79011ff7a0f4b1d6da6a61aa1aa604fb312d6647de5bad20013682d1429ce"},
{file = "pandas-2.2.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1cb51fe389360f3b5a4d57dbd2848a5f033350336ca3b340d1c53a1fad33bcad"},
{file = "pandas-2.2.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eee3a87076c0756de40b05c5e9a6069c035ba43e8dd71c379e68cab2c20f16ad"},
{file = "pandas-2.2.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:3e374f59e440d4ab45ca2fffde54b81ac3834cf5ae2cdfa69c90bc03bde04d76"},
{file = "pandas-2.2.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:43498c0bdb43d55cb162cdc8c06fac328ccb5d2eabe3cadeb3529ae6f0517c32"},
{file = "pandas-2.2.2-cp312-cp312-win_amd64.whl", hash = "sha256:d187d355ecec3629624fccb01d104da7d7f391db0311145817525281e2804d23"},
{file = "pandas-2.2.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0ca6377b8fca51815f382bd0b697a0814c8bda55115678cbc94c30aacbb6eff2"},
{file = "pandas-2.2.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9057e6aa78a584bc93a13f0a9bf7e753a5e9770a30b4d758b8d5f2a62a9433cd"},
{file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:001910ad31abc7bf06f49dcc903755d2f7f3a9186c0c040b827e522e9cef0863"},
{file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:66b479b0bd07204e37583c191535505410daa8df638fd8e75ae1b383851fe921"},
{file = "pandas-2.2.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:a77e9d1c386196879aa5eb712e77461aaee433e54c68cf253053a73b7e49c33a"},
@@ -1103,7 +1100,6 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
@@ -1191,6 +1187,17 @@ files = [
[package.dependencies]
pyasn1 = ">=0.1.3"
[[package]]
name = "semver"
version = "3.0.2"
description = "Python helper for Semantic Versioning (https://semver.org)"
optional = false
python-versions = ">=3.7"
files = [
{file = "semver-3.0.2-py3-none-any.whl", hash = "sha256:b1ea4686fe70b981f85359eda33199d60c53964284e0cfb4977d243e37cf4bf4"},
{file = "semver-3.0.2.tar.gz", hash = "sha256:6253adb39c70f6e51afed2fa7152bcd414c411286088fb4b9effb133885ab4cc"},
]
[[package]]
name = "simpleeval"
version = "0.9.13"
@@ -1356,4 +1363,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "b186349c72baec2f6e77709b273c0715d7a85f9b6134def3f6869b0a4562c6b2"
content-hash = "ebdbd7922a080c2cf0f07fcd3b4c5d41bb5e280c1bc7e80f76af5678654c2ed5"

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "connector_ops"
version = "0.9.0"
version = "0.10.0"
description = "Packaged maintained by the connector operations team to perform CI for connectors"
authors = ["Airbyte <contact@airbyte.io>"]
@@ -22,6 +22,7 @@ google-cloud-storage = "^2.8.0"
ci-credentials = {path = "../ci_credentials"}
pandas = "^2.0.3"
simpleeval = "^0.9.13"
semver = "^3.0.2"
[tool.poetry.group.dev.dependencies]
pytest = "^8"

View File

@@ -6,6 +6,7 @@ from contextlib import nullcontext as does_not_raise
from pathlib import Path
import pytest
import semver
from connector_ops import utils
@@ -45,7 +46,7 @@ class TestConnector:
assert isinstance(connector.support_level, str)
assert isinstance(connector.acceptance_test_config, dict)
assert connector.icon_path == Path(f"./airbyte-integrations/connectors/{connector.technical_name}/icon.svg")
assert len(connector.version.split(".")) == 3
assert semver.Version.parse(connector.version)
else:
assert connector.metadata is None
assert connector.support_level is None

View File

@@ -76,7 +76,8 @@ async def run_connector_version_bump_pipeline(
async with semaphore:
steps_results = []
async with context:
bump_version = BumpConnectorVersion(context, bump_type)
connector_directory = await context.get_connector_dir()
bump_version = BumpConnectorVersion(context, connector_directory, bump_type)
bump_version_result = await bump_version.run()
steps_results.append(bump_version_result)
if not bump_version_result.success:
@@ -88,9 +89,9 @@ async def run_connector_version_bump_pipeline(
for modified_file in bump_version.modified_files:
await updated_connector_directory.file(modified_file).export(str(context.connector.code_directory / modified_file))
context.logger.info(f"Exported {modified_file} following the version bump.")
documentation_directory = await context.get_repo_dir(include=[str(context.connector.local_connector_documentation_directory)])
add_changelog_entry = AddChangelogEntry(
context, bump_version.new_version, changelog_entry, pull_request_number=pull_request_number
context, documentation_directory, bump_version.new_version, changelog_entry, pull_request_number=pull_request_number
)
add_changelog_entry_result = await add_changelog_entry.run()
steps_results.append(add_changelog_entry_result)

View File

@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import List, Optional
from typing import List
import asyncclick as click
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID

View File

@@ -193,6 +193,7 @@ async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, s
og_repo_dir = await context.get_repo_dir()
update_base_image_in_metadata = UpdateBaseImageMetadata(
context,
await context.get_connector_dir(),
set_if_not_exists=set_if_not_exists,
)
update_base_image_in_metadata_result = await update_base_image_in_metadata.run()
@@ -232,6 +233,7 @@ async def run_connector_migration_to_base_image_pipeline(context: ConnectorConte
# UPDATE BASE IMAGE IN METADATA
update_base_image_in_metadata = UpdateBaseImageMetadata(
context,
await context.get_connector_dir(),
set_if_not_exists=True,
)
update_base_image_in_metadata_result = await update_base_image_in_metadata.run()

View File

@@ -18,7 +18,6 @@ from pipelines.airbyte_ci.connectors.context import ConnectorContext, PipelineCo
from pipelines.airbyte_ci.connectors.reports import Report
from pipelines.consts import LOCAL_BUILD_PLATFORM
from pipelines.helpers.connectors.command import run_connector_steps
from pipelines.helpers.connectors.format import format_prettier
from pipelines.helpers.connectors.yaml import read_yaml, write_yaml
from pipelines.helpers.execution.run_steps import STEP_TREE, StepToRun
from pipelines.models.steps import Step, StepResult, StepStatus

View File

@@ -5,11 +5,57 @@
import copy
import logging
import typing
from typing import Any, Mapping, Optional, Set, Type
from typing import Any, Dict, Mapping, Optional, Set, Type
from pydantic import BaseModel
from .declarative_component_schema import *
from .declarative_component_schema import (
ApiKeyAuthenticator,
BasicHttpAuthenticator,
BearerAuthenticator,
CompositeErrorHandler,
ConstantBackoffStrategy,
CursorPagination,
CustomAuthenticator,
CustomBackoffStrategy,
CustomErrorHandler,
CustomIncrementalSync,
CustomPaginationStrategy,
CustomPartitionRouter,
CustomRecordExtractor,
CustomRecordFilter,
CustomRequester,
CustomRetriever,
CustomSchemaLoader,
CustomStateMigration,
CustomTransformation,
DatetimeBasedCursor,
DeclarativeSource,
DeclarativeStream,
DefaultErrorHandler,
DefaultPaginator,
DpathExtractor,
ExponentialBackoffStrategy,
HttpRequester,
HttpResponseFilter,
JsonFileSchemaLoader,
JwtAuthenticator,
LegacySessionTokenAuthenticator,
ListPartitionRouter,
MinMaxDatetime,
OAuthAuthenticator,
OffsetIncrement,
PageIncrement,
ParentStreamConfig,
RecordFilter,
RecordSelector,
SelectiveAuthenticator,
SessionTokenAuthenticator,
SimpleRetriever,
SubstreamPartitionRouter,
WaitTimeFromHeader,
WaitUntilTimeFromHeader,
)
PARAMETERS_STR = "$parameters"

View File

@@ -101,7 +101,7 @@ class ManifestReferenceResolver:
"""
return self._evaluate_node(manifest, manifest, set()) # type: ignore[no-any-return]
def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set[Any]) -> Any:
def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set[Any]) -> Any: # noqa: ANN401
if isinstance(node, dict):
evaluated_dict = {k: self._evaluate_node(v, manifest, visited) for k, v in node.items() if not self._is_ref_key(k)}
if REF_TAG in node:
@@ -126,7 +126,7 @@ class ManifestReferenceResolver:
else:
return node
def _lookup_ref_value(self, ref: str, manifest: Mapping[str, Any]) -> Any:
def _lookup_ref_value(self, ref: str, manifest: Mapping[str, Any]) -> Any: # noqa: ANN401
ref_match = re.match(r"#/(.*)", ref)
if not ref_match:
raise ValueError(f"Invalid reference format {ref}")
@@ -137,7 +137,7 @@ class ManifestReferenceResolver:
raise ValueError(f"{path}, {ref}")
@staticmethod
def _is_ref(node: Any) -> bool:
def _is_ref(node: Any) -> bool: # noqa: ANN401
return isinstance(node, str) and node.startswith("#/")
@staticmethod
@@ -145,7 +145,7 @@ class ManifestReferenceResolver:
return bool(key == REF_TAG)
@staticmethod
def _read_ref_value(ref: str, manifest_node: Mapping[str, Any]) -> Any:
def _read_ref_value(ref: str, manifest_node: Mapping[str, Any]) -> Any: # noqa: ANN401
"""
Read the value at the referenced location of the manifest.

View File

@@ -191,6 +191,7 @@ async def publish(
python_registry_url=python_registry_url,
python_registry_check_url=python_registry_check_url,
rollout_mode=rollout_mode,
ci_github_access_token=ctx.obj.get("ci_github_access_token"),
)
for connector in ctx.obj["selected_connectors_with_modified_files"]
]

View File

@@ -55,6 +55,7 @@ class PublishConnectorContext(ConnectorContext):
s3_build_cache_secret_key: Optional[Secret] = None,
use_local_cdk: bool = False,
python_registry_token: Optional[Secret] = None,
ci_github_access_token: Optional[Secret] = None,
) -> None:
self.pre_release = pre_release
self.spec_cache_bucket_name = spec_cache_bucket_name
@@ -94,6 +95,7 @@ class PublishConnectorContext(ConnectorContext):
docker_hub_password=docker_hub_password,
s3_build_cache_access_key_id=s3_build_cache_access_key_id,
s3_build_cache_secret_key=s3_build_cache_secret_key,
ci_github_access_token=ci_github_access_token,
)
# Reassigning current class required instance attribute

View File

@@ -6,28 +6,30 @@ import json
import os
import uuid
from datetime import datetime
from typing import Dict, List, Tuple
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import anyio
import semver
import yaml
from airbyte_protocol.models.airbyte_protocol import ConnectorSpecification # type: ignore
from connector_ops.utils import ConnectorLanguage # type: ignore
from dagger import Container, ExecError, File, ImageLayerCompression, Platform, QueryError
from connector_ops.utils import METADATA_FILE_NAME, ConnectorLanguage # type: ignore
from dagger import Container, Directory, ExecError, File, ImageLayerCompression, Platform, QueryError
from pipelines import consts
from pipelines.airbyte_ci.connectors.build_image import steps
from pipelines.airbyte_ci.connectors.publish.context import PublishConnectorContext, RolloutMode
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.airbyte_ci.metadata.pipeline import (
MetadataPromoteReleaseCandidate,
MetadataRollbackReleaseCandidate,
MetadataUpload,
MetadataValidation,
)
from pipelines.airbyte_ci.metadata.pipeline import MetadataRollbackReleaseCandidate, MetadataUpload, MetadataValidation
from pipelines.airbyte_ci.steps.bump_version import SetConnectorVersion
from pipelines.airbyte_ci.steps.changelog import AddChangelogEntry
from pipelines.airbyte_ci.steps.pull_request import CreateOrUpdatePullRequest
from pipelines.airbyte_ci.steps.python_registry import PublishToPythonRegistry, PythonRegistryPublishContext
from pipelines.consts import LOCAL_BUILD_PLATFORM
from pipelines.dagger.actions.remote_storage import upload_to_gcs
from pipelines.dagger.actions.system import docker
from pipelines.helpers.connectors.dagger_fs import dagger_read_file, dagger_write_file
from pipelines.helpers.pip import is_package_published
from pipelines.models.steps import Step, StepResult, StepStatus
from pipelines.models.steps import Step, StepModifyingFiles, StepResult, StepStatus
from pydantic import BaseModel, ValidationError
@@ -384,6 +386,74 @@ class UploadSbom(Step):
return StepResult(step=self, status=StepStatus.SUCCESS, stdout="Uploaded SBOM to metadata service bucket.")
class SetPromotedVersion(SetConnectorVersion):
context: PublishConnectorContext
title = "Promote release candidate"
@property
def current_semver_version(self) -> semver.Version:
return semver.Version.parse(self.context.connector.version)
@property
def promoted_semver_version(self) -> semver.Version:
return self.current_semver_version.replace(prerelease=None)
@property
def promoted_version(self) -> str:
return str(self.promoted_semver_version)
@property
def current_version_is_rc(self) -> bool:
return bool(self.current_semver_version.prerelease and "rc" in self.current_semver_version.prerelease)
def __init__(self, context: PublishConnectorContext, connector_directory: Directory) -> None:
self.context = context
super().__init__(context, connector_directory, self.promoted_version)
async def _run(self) -> StepResult:
if not self.current_version_is_rc:
return StepResult(step=self, status=StepStatus.SKIPPED, stdout="The connector version has no rc suffix.")
return await super()._run()
class ResetReleaseCandidate(StepModifyingFiles):
context: PublishConnectorContext
title = "Reset release candidate flag"
async def _run(self) -> StepResult:
raw_metadata = await dagger_read_file(await self.context.get_connector_dir(include=METADATA_FILE_NAME), METADATA_FILE_NAME)
current_metadata = yaml.safe_load(raw_metadata)
is_release_candidate = current_metadata.get("data", {}).get("releases", {}).get("isReleaseCandidate", False)
if not is_release_candidate:
return StepResult(step=self, status=StepStatus.SKIPPED, stdout="The connector is not a release candidate.")
# We do an in-place replacement instead of serializing back to yaml to preserve comments and formatting.
new_raw_metadata = raw_metadata.replace("isReleaseCandidate: true", "isReleaseCandidate: false")
self.modified_directory = dagger_write_file(self.modified_directory, METADATA_FILE_NAME, new_raw_metadata)
self.modified_files.append(METADATA_FILE_NAME)
return StepResult(
step=self,
status=StepStatus.SUCCESS,
stdout="Set the isReleaseCandidate flag to false in the metadata file.",
output=self.modified_directory,
)
# Helpers
def create_connector_report(results: List[StepResult], context: PublishConnectorContext) -> ConnectorReport:
"""Generate a connector report from results and assign it to the context.
Args:
results (List[StepResult]): List of step results.
context (PublishConnectorContext): The connector context to assign the report to.
Returns:
ConnectorReport: The connector report.
"""
report = ConnectorReport(context, results, name="PUBLISH RESULTS")
context.report = report
return report
# Pipeline
async def run_connector_publish_pipeline(context: PublishConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport:
"""Run a publish pipeline for a single connector.
@@ -415,11 +485,6 @@ async def run_connector_publish_pipeline(context: PublishConnectorContext, semap
upload_sbom_step = UploadSbom(context)
def create_connector_report(results: List[StepResult]) -> ConnectorReport:
report = ConnectorReport(context, results, name="PUBLISH RESULTS")
context.report = report
return report
async with semaphore:
async with context:
results = []
@@ -429,14 +494,14 @@ async def run_connector_publish_pipeline(context: PublishConnectorContext, semap
# Exit early if the metadata file is invalid.
if metadata_validation_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
check_connector_image_results = await CheckConnectorImageDoesNotExist(context).run()
results.append(check_connector_image_results)
python_registry_steps, terminate_early = await _run_python_registry_publish_pipeline(context)
results.extend(python_registry_steps)
if terminate_early:
return create_connector_report(results)
return create_connector_report(results, context)
# If the connector image already exists, we don't need to build it, but we still need to upload the metadata file.
# We also need to upload the spec to the spec cache bucket.
@@ -448,26 +513,26 @@ async def run_connector_publish_pipeline(context: PublishConnectorContext, semap
upload_to_spec_cache_results = await upload_spec_to_cache_step.run(already_published_connector)
results.append(upload_to_spec_cache_results)
if upload_to_spec_cache_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
upload_sbom_results = await upload_sbom_step.run()
results.append(upload_sbom_results)
if upload_sbom_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
metadata_upload_results = await metadata_upload_step.run()
results.append(metadata_upload_results)
# Exit early if the connector image already exists or has failed to build
if check_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
build_connector_results = await steps.run_connector_build(context)
results.append(build_connector_results)
# Exit early if the connector image failed to build
if build_connector_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
if context.connector.language in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE]:
upload_dependencies_step = await UploadDependenciesToMetadataService(context).run(build_connector_results.output)
@@ -479,7 +544,7 @@ async def run_connector_publish_pipeline(context: PublishConnectorContext, semap
# Exit early if the connector image failed to push
if push_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
# Make sure the image published is healthy by pulling it and running SPEC on it.
# See https://github.com/airbytehq/airbyte/issues/26085
@@ -488,21 +553,21 @@ async def run_connector_publish_pipeline(context: PublishConnectorContext, semap
# Exit early if the connector image failed to pull
if pull_connector_image_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
upload_to_spec_cache_results = await upload_spec_to_cache_step.run(built_connector_platform_variants[0])
results.append(upload_to_spec_cache_results)
if upload_to_spec_cache_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
upload_sbom_results = await upload_sbom_step.run()
results.append(upload_sbom_results)
if upload_sbom_results.status is not StepStatus.SUCCESS:
return create_connector_report(results)
return create_connector_report(results, context)
metadata_upload_results = await metadata_upload_step.run()
results.append(metadata_upload_results)
connector_report = create_connector_report(results)
connector_report = create_connector_report(results, context)
return connector_report
@@ -565,8 +630,24 @@ async def run_connector_rollback_pipeline(context: PublishConnectorContext, sema
context, context.metadata_bucket_name, context.metadata_service_gcs_credentials
).run()
)
connector_report = create_connector_report(results, context)
return ConnectorReport(context, results, name="ROLLBACK RESULTS")
return connector_report
def get_promotion_pr_creation_arguments(
modified_files: Iterable[Path],
context: PublishConnectorContext,
step_results: Iterable[StepResult],
release_candidate_version: str,
promoted_version: str,
) -> Tuple[Tuple, Dict]:
return (modified_files,), {
"branch_id": f"{context.connector.technical_name}/{promoted_version}",
"commit_message": "\n".join(step_result.step.title for step_result in step_results if step_result.success),
"pr_title": f"🐙 {context.connector.technical_name}: release {promoted_version}",
"pr_body": f"The release candidate version {release_candidate_version} has been deemed stable and is now ready to be promoted to an official release ({promoted_version}).",
}
async def run_connector_promote_pipeline(context: PublishConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport:
@@ -578,22 +659,69 @@ async def run_connector_promote_pipeline(context: PublishConnectorContext, semap
Returns:
ConnectorReport: The reports holding promote results.
"""
results = []
current_version = context.connector.version
all_modified_files = set()
async with semaphore:
async with context:
assert context.rollout_mode == RolloutMode.PROMOTE, "This pipeline can only run in promote mode."
assert context.connector.metadata.get("releases", {}).get(
"isReleaseCandidate", True
), "This pipeline can only run for release candidates."
metadata_promote_result = await MetadataPromoteReleaseCandidate(
context, context.metadata_bucket_name, context.metadata_service_gcs_credentials
).run()
results.append(metadata_promote_result)
if metadata_promote_result.status is StepStatus.FAILURE:
return ConnectorReport(context, results, name="PROMOTE RESULTS")
publish_latest_tag_results = await PushVersionImageAsLatest(context).run()
results.append(publish_latest_tag_results)
return ConnectorReport(context, results, name="PROMOTE RESULTS")
original_connector_directory = await context.get_connector_dir()
# Remove RC suffix
set_promoted_version = SetPromotedVersion(context, original_connector_directory)
set_promoted_version_results = await set_promoted_version.run()
results.append(set_promoted_version_results)
if set_promoted_version_results.success:
all_modified_files.update(await set_promoted_version.export_modified_files(context.connector.code_directory))
# Set isReleaseCandidate to False
reset_release_candidate = ResetReleaseCandidate(context, set_promoted_version_results.output)
reset_release_candidate_results = await reset_release_candidate.run()
results.append(reset_release_candidate_results)
if reset_release_candidate_results.success:
all_modified_files.update(await reset_release_candidate.export_modified_files(context.connector.code_directory))
if not all([result.success for result in results]):
context.logger.error("The metadata update failed. Skipping PR creation.")
connector_report = create_connector_report(results, context)
return connector_report
# Open PR when all previous steps are successful
promoted_version = set_promoted_version.promoted_version
initial_pr_creation = CreateOrUpdatePullRequest(context, skip_ci=True)
pr_creation_args, pr_creation_kwargs = get_promotion_pr_creation_arguments(
all_modified_files, context, results, current_version, promoted_version
)
initial_pr_creation_result = await initial_pr_creation.run(*pr_creation_args, **pr_creation_kwargs)
results.append(initial_pr_creation_result)
# Update changelog and update PR
if initial_pr_creation_result.success:
created_pr = initial_pr_creation_result.output
documentation_directory = await context.get_repo_dir(
include=[str(context.connector.local_connector_documentation_directory)]
).directory(str(context.connector.local_connector_documentation_directory))
add_changelog_entry = AddChangelogEntry(
context,
documentation_directory,
promoted_version,
f"Promoting release candidate {current_version} to a main version.",
created_pr.number,
)
add_changelog_entry_result = await add_changelog_entry.run()
results.append(add_changelog_entry_result)
if add_changelog_entry_result.success:
all_modified_files.update(
await add_changelog_entry.export_modified_files(context.connector.local_connector_documentation_directory)
)
post_changelog_pr_update = CreateOrUpdatePullRequest(context, skip_ci=False, labels=["auto-merge"])
pr_creation_args, pr_creation_kwargs = get_promotion_pr_creation_arguments(
all_modified_files, context, results, current_version, promoted_version
)
post_changelog_pr_update_result = await post_changelog_pr_update.run(*pr_creation_args, **pr_creation_kwargs)
results.append(post_changelog_pr_update_result)
connector_report = create_connector_report(results, context)
return connector_report
def reorder_contexts(contexts: List[PublishConnectorContext]) -> List[PublishConnectorContext]:

View File

@@ -7,7 +7,6 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING
import dagger
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages
from pipelines.airbyte_ci.connectors.context import ConnectorContext
@@ -35,17 +34,6 @@ CHANGELOG_ENTRY_COMMENT = "Update dependencies"
## HELPER FUNCTIONS
async def export_modified_files(
step_with_modified_files: UpdateBaseImageMetadata | PoetryUpdate | BumpConnectorVersion | AddChangelogEntry,
directory_modified_by_step: dagger.Directory,
export_to_directory: Path,
) -> Set[Path]:
modified_files = set()
for modified_file in step_with_modified_files.modified_files:
local_path = export_to_directory / modified_file
await directory_modified_by_step.file(str(modified_file)).export(str(local_path))
modified_files.add(local_path)
return modified_files
def get_pr_body(context: ConnectorContext, step_results: Iterable[StepResult], dependency_updates: Iterable[DependencyUpdate]) -> str:
@@ -96,43 +84,38 @@ async def run_connector_up_to_date_pipeline(
new_version: str | None = None
connector_directory = await context.get_connector_dir()
upgrade_base_image_in_metadata = UpdateBaseImageMetadata(context, connector_directory=connector_directory)
upgrade_base_image_in_metadata = UpdateBaseImageMetadata(context, connector_directory)
upgrade_base_image_in_metadata_result = await upgrade_base_image_in_metadata.run()
step_results.append(upgrade_base_image_in_metadata_result)
if upgrade_base_image_in_metadata_result.success:
connector_directory = upgrade_base_image_in_metadata_result.output
exported_modified_files = await export_modified_files(
upgrade_base_image_in_metadata, connector_directory, context.connector.code_directory
)
exported_modified_files = await upgrade_base_image_in_metadata.export_modified_files(context.connector.code_directory)
context.logger.info(f"Exported files following the base image upgrade: {exported_modified_files}")
all_modified_files.update(exported_modified_files)
connector_directory = upgrade_base_image_in_metadata_result.output
if context.connector.is_using_poetry:
# We run the poetry update step after the base image upgrade because the base image upgrade may change the python environment
poetry_update = PoetryUpdate(context, specific_dependencies=specific_dependencies, connector_directory=connector_directory)
poetry_update = PoetryUpdate(context, connector_directory, specific_dependencies=specific_dependencies)
poetry_update_result = await poetry_update.run()
step_results.append(poetry_update_result)
if poetry_update_result.success:
connector_directory = poetry_update_result.output
exported_modified_files = await export_modified_files(
poetry_update, connector_directory, context.connector.code_directory
)
exported_modified_files = await poetry_update.export_modified_files(context.connector.code_directory)
context.logger.info(f"Exported files following the Poetry update: {exported_modified_files}")
all_modified_files.update(exported_modified_files)
connector_directory = poetry_update_result.output
one_previous_step_is_successful = any(step_result.success for step_result in step_results)
# NOTE:
# BumpConnectorVersion will already work for manifest-only and Java connectors too
if bump_connector_version and one_previous_step_is_successful:
bump_version = BumpConnectorVersion(context, BUMP_TYPE, connector_directory=connector_directory)
bump_version = BumpConnectorVersion(context, connector_directory, BUMP_TYPE)
bump_version_result = await bump_version.run()
step_results.append(bump_version_result)
if bump_version_result.success:
new_version = bump_version.new_version
exported_modified_files = await export_modified_files(
bump_version, bump_version_result.output, context.connector.code_directory
)
exported_modified_files = await bump_version.export_modified_files(context.connector.code_directory)
context.logger.info(f"Exported files following the version bump: {exported_modified_files}")
all_modified_files.update(exported_modified_files)
@@ -171,12 +154,17 @@ async def run_connector_up_to_date_pipeline(
created_pr = initial_pr_creation_result.output
if new_version and created_pr:
add_changelog_entry = AddChangelogEntry(context, new_version, CHANGELOG_ENTRY_COMMENT, created_pr.number)
documentation_directory = await context.get_repo_dir(
include=[str(context.connector.local_connector_documentation_directory)]
).directory(str(context.connector.local_connector_documentation_directory))
add_changelog_entry = AddChangelogEntry(
context, documentation_directory, new_version, CHANGELOG_ENTRY_COMMENT, created_pr.number
)
add_changelog_entry_result = await add_changelog_entry.run()
step_results.append(add_changelog_entry_result)
if add_changelog_entry_result.success:
# File path modified by the changelog entry step are relative to the repo root
exported_modified_files = await export_modified_files(add_changelog_entry, add_changelog_entry_result.output, Path("."))
exported_modified_files = await add_changelog_entry.export_modified_files(Path("."))
context.logger.info(f"Exported files following the changelog entry: {exported_modified_files}")
all_modified_files.update(exported_modified_files)
final_labels = DEFAULT_PR_LABELS + [AUTO_MERGE_PR_LABEL] if auto_merge else DEFAULT_PR_LABELS

View File

@@ -16,31 +16,28 @@ from connector_ops.utils import POETRY_LOCK_FILE_NAME, PYPROJECT_FILE_NAME # ty
from deepdiff import DeepDiff # type: ignore
from pipelines.airbyte_ci.connectors.context import ConnectorContext, PipelineContext
from pipelines.consts import LOCAL_BUILD_PLATFORM
from pipelines.models.steps import Step, StepResult, StepStatus
from pipelines.models.steps import Step, StepModifyingFiles, StepResult, StepStatus
if TYPE_CHECKING:
from typing import List
class PoetryUpdate(Step):
class PoetryUpdate(StepModifyingFiles):
context: ConnectorContext
dev: bool
specified_versions: dict[str, str]
modified_files: List[str]
title = "Update versions of libraries in poetry."
def __init__(
self,
context: PipelineContext,
connector_directory: dagger.Directory,
dev: bool = False,
specific_dependencies: List[str] | None = None,
connector_directory: dagger.Directory | None = None,
) -> None:
super().__init__(context)
super().__init__(context, connector_directory)
self.dev = dev
self.specified_versions = self.parse_specific_dependencies(specific_dependencies) if specific_dependencies else {}
self.connector_directory = connector_directory
self.modified_files = []
@staticmethod
def parse_specific_dependencies(specific_dependencies: List[str]) -> dict[str, str]:
@@ -56,7 +53,7 @@ class PoetryUpdate(Step):
return versions
async def _run(self) -> StepResult:
connector_directory = self.connector_directory or await self.context.get_connector_dir()
connector_directory = self.modified_directory
if PYPROJECT_FILE_NAME not in await connector_directory.entries():
return StepResult(step=self, status=StepStatus.SKIPPED, stderr=f"Connector does not have a {PYPROJECT_FILE_NAME}")
@@ -84,6 +81,7 @@ class PoetryUpdate(Step):
original_poetry_lock != await connector_container.file(POETRY_LOCK_FILE_NAME).contents()
or original_pyproject_file != await connector_container.file(PYPROJECT_FILE_NAME).contents()
):
self.modified_directory = await connector_container.directory(".")
self.modified_files = [POETRY_LOCK_FILE_NAME, PYPROJECT_FILE_NAME]
return StepResult(step=self, status=StepStatus.SUCCESS, output=connector_container.directory("."))

View File

@@ -9,10 +9,10 @@ from base_images import version_registry # type: ignore
from connector_ops.utils import METADATA_FILE_NAME # type: ignore
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.helpers.connectors.dagger_fs import dagger_read_file, dagger_write_file
from pipelines.models.steps import Step, StepResult, StepStatus
from pipelines.models.steps import StepModifyingFiles, StepResult, StepStatus
if TYPE_CHECKING:
from typing import List, Optional
from typing import Optional
import dagger
@@ -21,20 +21,18 @@ class NoBaseImageAddressInMetadataError(Exception):
pass
class UpdateBaseImageMetadata(Step):
class UpdateBaseImageMetadata(StepModifyingFiles):
context: ConnectorContext
title = "Upgrade the base image to the latest version in metadata.yaml"
modified_files: List[str]
def __init__(
self,
context: ConnectorContext,
connector_directory: dagger.Directory,
set_if_not_exists: bool = False,
connector_directory: Optional[dagger.Directory] = None,
) -> None:
super().__init__(context)
super().__init__(context, connector_directory)
self.set_if_not_exists = set_if_not_exists
self.modified_files = []
self.connector_directory = connector_directory

View File

@@ -12,10 +12,10 @@ from connector_ops.utils import METADATA_FILE_NAME, PYPROJECT_FILE_NAME # type:
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.dagger.actions.python.poetry import with_poetry
from pipelines.helpers.connectors.dagger_fs import dagger_read_file, dagger_write_file
from pipelines.models.steps import Step, StepResult, StepStatus
from pipelines.models.steps import StepModifyingFiles, StepResult, StepStatus
if TYPE_CHECKING:
from typing import List
pass
class ConnectorVersionNotFoundError(Exception):
@@ -26,9 +26,8 @@ class PoetryVersionBumpError(Exception):
pass
class SetConnectorVersion(Step):
class SetConnectorVersion(StepModifyingFiles):
context: ConnectorContext
modified_files: List[str]
@property
def title(self) -> str:
@@ -37,13 +36,11 @@ class SetConnectorVersion(Step):
def __init__(
self,
context: ConnectorContext,
connector_directory: dagger.Directory,
new_version: str,
connector_directory: dagger.Directory | None = None,
) -> None:
super().__init__(context)
super().__init__(context, connector_directory)
self.new_version = new_version
self.modified_files = []
self.connector_directory = connector_directory
@staticmethod
async def _set_version_in_metadata(new_version: str, connector_directory: dagger.Directory) -> dagger.Directory:
@@ -77,9 +74,9 @@ class SetConnectorVersion(Step):
return connector_directory_with_updated_pyproject
async def _run(self) -> StepResult:
original_connector_directory = self.connector_directory or await self.context.get_connector_dir()
original_connector_directory = self.modified_directory
try:
updated_connector_directory = await self._set_version_in_metadata(self.new_version, original_connector_directory)
self.modified_directory = await self._set_version_in_metadata(self.new_version, original_connector_directory)
self.modified_files.append(METADATA_FILE_NAME)
except (FileNotFoundError, ConnectorVersionNotFoundError) as e:
return StepResult(
@@ -92,8 +89,8 @@ class SetConnectorVersion(Step):
if self.context.connector.pyproject_file_path.is_file():
try:
poetry_container = with_poetry(self.context)
updated_connector_directory = await self._set_version_in_poetry_package(
poetry_container, updated_connector_directory, self.new_version
self.modified_directory = await self._set_version_in_poetry_package(
poetry_container, self.modified_directory, self.new_version
)
self.modified_files.append(PYPROJECT_FILE_NAME)
except PoetryVersionBumpError as e:
@@ -108,7 +105,7 @@ class SetConnectorVersion(Step):
step=self,
status=StepStatus.SUCCESS,
stdout=f"Updated connector to {self.new_version}",
output=updated_connector_directory,
output=self.modified_directory,
)
@@ -116,11 +113,16 @@ class BumpConnectorVersion(SetConnectorVersion):
def __init__(
self,
context: ConnectorContext,
connector_directory: dagger.Directory,
bump_type: str,
connector_directory: dagger.Directory | None = None,
) -> None:
self.bump_type = bump_type
super().__init__(context, self.get_bumped_version(context.connector.version, bump_type), connector_directory=connector_directory)
new_version = self.get_bumped_version(context.connector.version, bump_type)
super().__init__(
context,
connector_directory,
new_version,
)
@property
def title(self) -> str:

View File

@@ -5,69 +5,66 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING
import semver
from dagger import Directory
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.helpers.changelog import Changelog
from pipelines.models.steps import Step, StepResult, StepStatus
if TYPE_CHECKING:
from typing import List
from pipelines.helpers.connectors.dagger_fs import dagger_read_file, dagger_write_file
from pipelines.models.steps import StepModifyingFiles, StepResult, StepStatus
class AddChangelogEntry(Step):
class AddChangelogEntry(StepModifyingFiles):
context: ConnectorContext
modified_files: List[str]
title = "Add changelog entry"
def __init__(
self,
context: ConnectorContext,
documentation_directory: Directory,
new_version: str,
comment: str,
pull_request_number: str | int | None,
repo_dir: Directory | None = None,
) -> None:
super().__init__(context)
super().__init__(context, documentation_directory)
self.new_version = semver.VersionInfo.parse(new_version)
self.comment = comment
self.pull_request_number = pull_request_number or "*PR_NUMBER_PLACEHOLDER*"
self.modified_files = []
self.repo_dir = repo_dir
async def _run(self, pull_request_number: int | str | None = None) -> StepResult:
if self.repo_dir is None:
self.repo_dir = await self.context.get_repo_dir(include=[str(self.context.connector.local_connector_documentation_directory)])
if pull_request_number is None:
# this allows passing it dynamically from a result of another action (like creating a pull request)
pull_request_number = self.pull_request_number
doc_path = self.context.connector.documentation_file_path
if not doc_path or not doc_path.exists():
try:
original_markdown = await dagger_read_file(self.modified_directory, self.context.connector.documentation_file_name)
except FileNotFoundError:
return StepResult(
step=self,
status=StepStatus.SKIPPED,
stdout="Connector does not have a documentation file.",
output=self.repo_dir,
stderr="Connector does not have a documentation file.",
)
try:
original_markdown = doc_path.read_text()
changelog = Changelog(original_markdown)
changelog.add_entry(self.new_version, datetime.date.today(), pull_request_number, self.comment)
updated_doc = changelog.to_markdown()
except Exception as e:
return StepResult(
step=self, status=StepStatus.FAILURE, stderr=f"Could not add changelog entry: {e}", output=self.repo_dir, exc_info=e
step=self,
status=StepStatus.FAILURE,
stderr=f"Could not add changelog entry: {e}",
output=self.modified_directory,
exc_info=e,
)
self.repo_dir = self.repo_dir.with_new_file(str(doc_path), contents=updated_doc)
self.modified_files.append(doc_path)
self.modified_directory = dagger_write_file(self.modified_directory, self.context.connector.documentation_file_name, updated_doc)
self.modified_files.append(self.context.connector.documentation_file_name)
return StepResult(
step=self,
status=StepStatus.SUCCESS,
stdout=f"Added changelog entry to {doc_path}",
output=self.repo_dir,
stdout=f"Added changelog entry to {self.context.connector.documentation_file_name}",
output=self.modified_directory,
)

View File

@@ -10,7 +10,6 @@ import os
import shutil
import ssl
import sys
import tempfile
import urllib.request
from typing import TYPE_CHECKING

View File

@@ -23,7 +23,8 @@ from pipelines.models.artifacts import Artifact
from pipelines.models.secrets import Secret
if TYPE_CHECKING:
from typing import Any, ClassVar, Optional, Union
import dagger
from typing import Any, ClassVar, Optional, Union, Set
from pipelines.airbyte_ci.format.format_command import FormatCommand
from pipelines.models.contexts.pipeline_context import PipelineContext
@@ -414,3 +415,25 @@ class Step(ABC):
status=StepStatus.FAILURE,
stdout=f"Timed out after the max duration of {format_duration(self.max_duration)}. Please checkout the Dagger logs to see what happened.",
)
class StepModifyingFiles(Step):
modified_files: List[str]
modified_directory: dagger.Directory
def __init__(self, context: PipelineContext, modified_directory: dagger.Directory, secrets: List[Secret] | None = None) -> None:
super().__init__(context, secrets)
self.modified_directory = modified_directory
self.modified_files = []
async def export_modified_files(
self,
export_to_directory: Path,
) -> Set[Path]:
modified_files = set()
for modified_file in self.modified_files:
local_path = export_to_directory / modified_file
await self.modified_directory.file(str(modified_file)).export(str(local_path))
modified_files.add(local_path)
return modified_files

File diff suppressed because it is too large Load Diff