metadata-service[orchestrator]: generate connector registry with release candidates (#44588)
This commit is contained in:
@@ -0,0 +1,49 @@
|
||||
# generated by datamodel-codegen:
|
||||
# filename: ConnectorBreakingChanges.yaml
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import AnyUrl, BaseModel, Extra, Field, constr
|
||||
|
||||
|
||||
class StreamBreakingChangeScope(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
scopeType: Any = Field("stream", const=True)
|
||||
impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1)
|
||||
|
||||
|
||||
class BreakingChangeScope(BaseModel):
|
||||
__root__: StreamBreakingChangeScope = Field(..., description="A scope that can be used to limit the impact of a breaking change.")
|
||||
|
||||
|
||||
class VersionBreakingChange(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
upgradeDeadline: date = Field(..., description="The deadline by which to upgrade before the breaking change takes effect.")
|
||||
message: str = Field(..., description="Descriptive message detailing the breaking change.")
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version}",
|
||||
)
|
||||
scopedImpact: Optional[List[BreakingChangeScope]] = Field(
|
||||
None,
|
||||
description="List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types.",
|
||||
min_items=1,
|
||||
)
|
||||
|
||||
|
||||
class ConnectorBreakingChanges(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
@@ -267,7 +267,9 @@ class ConnectorBreakingChanges(BaseModel):
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade."
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -86,6 +86,16 @@ class StreamBreakingChangeScope(BaseModel):
|
||||
impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1)
|
||||
|
||||
|
||||
class SuggestedStreams(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
streams: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.",
|
||||
)
|
||||
|
||||
|
||||
class AirbyteInternal(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
@@ -182,20 +192,9 @@ class ConnectorBreakingChanges(BaseModel):
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade."
|
||||
)
|
||||
|
||||
|
||||
class ConnectorReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
isReleaseCandidate: Optional[bool] = Field(False, description="Whether the release is eligible to be a release candidate.")
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: ConnectorBreakingChanges
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
|
||||
|
||||
@@ -230,11 +229,82 @@ class ConnectorRegistryDestinationDefinition(BaseModel):
|
||||
description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.",
|
||||
)
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
releases: Optional[ConnectorReleases] = None
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
supportsRefreshes: Optional[bool] = False
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(
|
||||
None, description="The language the connector is written in"
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
class ConnectorRegistryReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
releaseCandidates: Optional[ConnectorReleaseCandidates] = None
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: Optional[ConnectorBreakingChanges] = None
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorReleaseCandidates(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field(
|
||||
..., description="Each entry denotes a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class VersionReleaseCandidate(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field(
|
||||
..., description="Contains information about a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistrySourceDefinition(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
sourceDefinitionId: UUID
|
||||
name: str
|
||||
dockerRepository: str
|
||||
dockerImageTag: str
|
||||
documentationUrl: str
|
||||
icon: Optional[str] = None
|
||||
iconUrl: Optional[str] = None
|
||||
sourceType: Optional[Literal["api", "file", "database", "custom"]] = None
|
||||
spec: Dict[str, Any]
|
||||
tombstone: Optional[bool] = Field(
|
||||
False, description="if false, the configuration is active. if true, then this configuration is permanently off."
|
||||
)
|
||||
public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces")
|
||||
custom: Optional[bool] = Field(False, description="whether this is a custom connector definition")
|
||||
releaseStage: Optional[ReleaseStage] = None
|
||||
supportLevel: Optional[SupportLevel] = None
|
||||
releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.")
|
||||
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
|
||||
protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector")
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
suggestedStreams: Optional[SuggestedStreams] = None
|
||||
maxSecondsBetweenMessages: Optional[int] = Field(
|
||||
None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach"
|
||||
)
|
||||
erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD")
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
ConnectorRegistryDestinationDefinition.update_forward_refs()
|
||||
ConnectorRegistryReleases.update_forward_refs()
|
||||
ConnectorReleaseCandidates.update_forward_refs()
|
||||
VersionReleaseCandidate.update_forward_refs()
|
||||
|
||||
@@ -0,0 +1,309 @@
|
||||
# generated by datamodel-codegen:
|
||||
# filename: ConnectorRegistryReleases.yaml
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date, datetime
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import AnyUrl, BaseModel, Extra, Field, conint, constr
|
||||
from typing_extensions import Literal
|
||||
|
||||
|
||||
class RolloutConfiguration(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
initialPercentage: Optional[conint(ge=0, le=100)] = Field(
|
||||
0, description="The percentage of users that should receive the new version initially."
|
||||
)
|
||||
maxPercentage: Optional[conint(ge=0, le=100)] = Field(
|
||||
50, description="The percentage of users who should receive the release candidate during the test phase before full rollout."
|
||||
)
|
||||
advanceDelayMinutes: Optional[conint(ge=10)] = Field(
|
||||
10, description="The number of minutes to wait before advancing the rollout percentage."
|
||||
)
|
||||
|
||||
|
||||
class StreamBreakingChangeScope(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
scopeType: Any = Field("stream", const=True)
|
||||
impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1)
|
||||
|
||||
|
||||
class ReleaseStage(BaseModel):
|
||||
__root__: Literal["alpha", "beta", "generally_available", "custom"] = Field(
|
||||
..., description="enum that describes a connector's release stage", title="ReleaseStage"
|
||||
)
|
||||
|
||||
|
||||
class SupportLevel(BaseModel):
|
||||
__root__: Literal["community", "certified", "archived"] = Field(
|
||||
..., description="enum that describes a connector's release stage", title="SupportLevel"
|
||||
)
|
||||
|
||||
|
||||
class ResourceRequirements(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
cpu_request: Optional[str] = None
|
||||
cpu_limit: Optional[str] = None
|
||||
memory_request: Optional[str] = None
|
||||
memory_limit: Optional[str] = None
|
||||
|
||||
|
||||
class JobType(BaseModel):
|
||||
__root__: Literal["get_spec", "check_connection", "discover_schema", "sync", "reset_connection", "connection_updater", "replicate"] = (
|
||||
Field(..., description="enum that describes the different types of jobs that the platform runs.", title="JobType")
|
||||
)
|
||||
|
||||
|
||||
class AllowedHosts(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
hosts: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.",
|
||||
)
|
||||
|
||||
|
||||
class SuggestedStreams(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
streams: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.",
|
||||
)
|
||||
|
||||
|
||||
class AirbyteInternal(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
sl: Optional[Literal[100, 200, 300]] = None
|
||||
ql: Optional[Literal[100, 200, 300, 400, 500, 600]] = None
|
||||
|
||||
|
||||
class GitInfo(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
commit_sha: Optional[str] = Field(None, description="The git commit sha of the last commit that modified this file.")
|
||||
commit_timestamp: Optional[datetime] = Field(None, description="The git commit timestamp of the last commit that modified this file.")
|
||||
commit_author: Optional[str] = Field(None, description="The git commit author of the last commit that modified this file.")
|
||||
commit_author_email: Optional[str] = Field(None, description="The git commit author email of the last commit that modified this file.")
|
||||
|
||||
|
||||
class SourceFileInfo(BaseModel):
|
||||
metadata_etag: Optional[str] = None
|
||||
metadata_file_path: Optional[str] = None
|
||||
metadata_bucket_name: Optional[str] = None
|
||||
metadata_last_modified: Optional[str] = None
|
||||
registry_entry_generated_at: Optional[str] = None
|
||||
|
||||
|
||||
class ConnectorMetrics(BaseModel):
|
||||
all: Optional[Any] = None
|
||||
cloud: Optional[Any] = None
|
||||
oss: Optional[Any] = None
|
||||
|
||||
|
||||
class ConnectorMetric(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
usage: Optional[Union[str, Literal["low", "medium", "high"]]] = None
|
||||
sync_success_rate: Optional[Union[str, Literal["low", "medium", "high"]]] = None
|
||||
connector_version: Optional[str] = None
|
||||
|
||||
|
||||
class ConnectorPackageInfo(BaseModel):
|
||||
cdk_version: Optional[str] = None
|
||||
|
||||
|
||||
class NormalizationDestinationDefinitionConfig(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
normalizationRepository: str = Field(
|
||||
...,
|
||||
description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.",
|
||||
)
|
||||
normalizationTag: str = Field(..., description="a field indicating the tag of the docker repository to be used for normalization.")
|
||||
normalizationIntegrationType: str = Field(
|
||||
..., description="a field indicating the type of integration dialect to use for normalization."
|
||||
)
|
||||
|
||||
|
||||
class BreakingChangeScope(BaseModel):
|
||||
__root__: StreamBreakingChangeScope = Field(..., description="A scope that can be used to limit the impact of a breaking change.")
|
||||
|
||||
|
||||
class JobTypeResourceLimit(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
jobType: JobType
|
||||
resourceRequirements: ResourceRequirements
|
||||
|
||||
|
||||
class GeneratedFields(BaseModel):
|
||||
git: Optional[GitInfo] = None
|
||||
source_file_info: Optional[SourceFileInfo] = None
|
||||
metrics: Optional[ConnectorMetrics] = None
|
||||
sbomUrl: Optional[str] = Field(None, description="URL to the SBOM file")
|
||||
|
||||
|
||||
class VersionBreakingChange(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
upgradeDeadline: date = Field(..., description="The deadline by which to upgrade before the breaking change takes effect.")
|
||||
message: str = Field(..., description="Descriptive message detailing the breaking change.")
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version}",
|
||||
)
|
||||
scopedImpact: Optional[List[BreakingChangeScope]] = Field(
|
||||
None,
|
||||
description="List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types.",
|
||||
min_items=1,
|
||||
)
|
||||
|
||||
|
||||
class ActorDefinitionResourceRequirements(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
default: Optional[ResourceRequirements] = Field(
|
||||
None, description="if set, these are the requirements that should be set for ALL jobs run for this actor definition."
|
||||
)
|
||||
jobSpecific: Optional[List[JobTypeResourceLimit]] = None
|
||||
|
||||
|
||||
class ConnectorBreakingChanges(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistryReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
releaseCandidates: Optional[ConnectorReleaseCandidates] = None
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: Optional[ConnectorBreakingChanges] = None
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorReleaseCandidates(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field(
|
||||
..., description="Each entry denotes a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class VersionReleaseCandidate(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field(
|
||||
..., description="Contains information about a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistrySourceDefinition(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
sourceDefinitionId: UUID
|
||||
name: str
|
||||
dockerRepository: str
|
||||
dockerImageTag: str
|
||||
documentationUrl: str
|
||||
icon: Optional[str] = None
|
||||
iconUrl: Optional[str] = None
|
||||
sourceType: Optional[Literal["api", "file", "database", "custom"]] = None
|
||||
spec: Dict[str, Any]
|
||||
tombstone: Optional[bool] = Field(
|
||||
False, description="if false, the configuration is active. if true, then this configuration is permanently off."
|
||||
)
|
||||
public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces")
|
||||
custom: Optional[bool] = Field(False, description="whether this is a custom connector definition")
|
||||
releaseStage: Optional[ReleaseStage] = None
|
||||
supportLevel: Optional[SupportLevel] = None
|
||||
releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.")
|
||||
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
|
||||
protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector")
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
suggestedStreams: Optional[SuggestedStreams] = None
|
||||
maxSecondsBetweenMessages: Optional[int] = Field(
|
||||
None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach"
|
||||
)
|
||||
erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD")
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
class ConnectorRegistryDestinationDefinition(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
destinationDefinitionId: UUID
|
||||
name: str
|
||||
dockerRepository: str
|
||||
dockerImageTag: str
|
||||
documentationUrl: str
|
||||
icon: Optional[str] = None
|
||||
iconUrl: Optional[str] = None
|
||||
spec: Dict[str, Any]
|
||||
tombstone: Optional[bool] = Field(
|
||||
False, description="if false, the configuration is active. if true, then this configuration is permanently off."
|
||||
)
|
||||
public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces")
|
||||
custom: Optional[bool] = Field(False, description="whether this is a custom connector definition")
|
||||
releaseStage: Optional[ReleaseStage] = None
|
||||
supportLevel: Optional[SupportLevel] = None
|
||||
releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.")
|
||||
tags: Optional[List[str]] = Field(
|
||||
None, description="An array of tags that describe the connector. E.g: language:python, keyword:rds, etc."
|
||||
)
|
||||
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
|
||||
protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector")
|
||||
normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None
|
||||
supportsDbt: Optional[bool] = Field(
|
||||
None,
|
||||
description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.",
|
||||
)
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
supportsRefreshes: Optional[bool] = False
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
ConnectorRegistryReleases.update_forward_refs()
|
||||
ConnectorReleaseCandidates.update_forward_refs()
|
||||
VersionReleaseCandidate.update_forward_refs()
|
||||
@@ -82,6 +82,20 @@ class StreamBreakingChangeScope(BaseModel):
|
||||
impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1)
|
||||
|
||||
|
||||
class NormalizationDestinationDefinitionConfig(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
normalizationRepository: str = Field(
|
||||
...,
|
||||
description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.",
|
||||
)
|
||||
normalizationTag: str = Field(..., description="a field indicating the tag of the docker repository to be used for normalization.")
|
||||
normalizationIntegrationType: str = Field(
|
||||
..., description="a field indicating the type of integration dialect to use for normalization."
|
||||
)
|
||||
|
||||
|
||||
class AirbyteInternal(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
@@ -178,20 +192,9 @@ class ConnectorBreakingChanges(BaseModel):
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade."
|
||||
)
|
||||
|
||||
|
||||
class ConnectorReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
isReleaseCandidate: Optional[bool] = Field(False, description="Whether the release is eligible to be a release candidate.")
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: ConnectorBreakingChanges
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
|
||||
|
||||
@@ -224,10 +227,84 @@ class ConnectorRegistrySourceDefinition(BaseModel):
|
||||
None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach"
|
||||
)
|
||||
erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD")
|
||||
releases: Optional[ConnectorReleases] = None
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(
|
||||
None, description="The language the connector is written in"
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
class ConnectorRegistryReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
releaseCandidates: Optional[ConnectorReleaseCandidates] = None
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: Optional[ConnectorBreakingChanges] = None
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorReleaseCandidates(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field(
|
||||
..., description="Each entry denotes a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class VersionReleaseCandidate(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field(
|
||||
..., description="Contains information about a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistryDestinationDefinition(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
destinationDefinitionId: UUID
|
||||
name: str
|
||||
dockerRepository: str
|
||||
dockerImageTag: str
|
||||
documentationUrl: str
|
||||
icon: Optional[str] = None
|
||||
iconUrl: Optional[str] = None
|
||||
spec: Dict[str, Any]
|
||||
tombstone: Optional[bool] = Field(
|
||||
False, description="if false, the configuration is active. if true, then this configuration is permanently off."
|
||||
)
|
||||
public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces")
|
||||
custom: Optional[bool] = Field(False, description="whether this is a custom connector definition")
|
||||
releaseStage: Optional[ReleaseStage] = None
|
||||
supportLevel: Optional[SupportLevel] = None
|
||||
releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.")
|
||||
tags: Optional[List[str]] = Field(
|
||||
None, description="An array of tags that describe the connector. E.g: language:python, keyword:rds, etc."
|
||||
)
|
||||
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
|
||||
protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector")
|
||||
normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None
|
||||
supportsDbt: Optional[bool] = Field(
|
||||
None,
|
||||
description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.",
|
||||
)
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
supportsRefreshes: Optional[bool] = False
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
ConnectorRegistrySourceDefinition.update_forward_refs()
|
||||
ConnectorRegistryReleases.update_forward_refs()
|
||||
ConnectorReleaseCandidates.update_forward_refs()
|
||||
VersionReleaseCandidate.update_forward_refs()
|
||||
|
||||
@@ -86,6 +86,16 @@ class StreamBreakingChangeScope(BaseModel):
|
||||
impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1)
|
||||
|
||||
|
||||
class SuggestedStreams(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
streams: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.",
|
||||
)
|
||||
|
||||
|
||||
class AirbyteInternal(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
@@ -131,16 +141,6 @@ class ConnectorPackageInfo(BaseModel):
|
||||
cdk_version: Optional[str] = None
|
||||
|
||||
|
||||
class SuggestedStreams(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
streams: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.",
|
||||
)
|
||||
|
||||
|
||||
class JobTypeResourceLimit(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
@@ -192,59 +192,15 @@ class ConnectorBreakingChanges(BaseModel):
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade."
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
isReleaseCandidate: Optional[bool] = Field(False, description="Whether the release is eligible to be a release candidate.")
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: ConnectorBreakingChanges
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistrySourceDefinition(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
sourceDefinitionId: UUID
|
||||
name: str
|
||||
dockerRepository: str
|
||||
dockerImageTag: str
|
||||
documentationUrl: str
|
||||
icon: Optional[str] = None
|
||||
iconUrl: Optional[str] = None
|
||||
sourceType: Optional[Literal["api", "file", "database", "custom"]] = None
|
||||
spec: Dict[str, Any]
|
||||
tombstone: Optional[bool] = Field(
|
||||
False, description="if false, the configuration is active. if true, then this configuration is permanently off."
|
||||
)
|
||||
public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces")
|
||||
custom: Optional[bool] = Field(False, description="whether this is a custom connector definition")
|
||||
releaseStage: Optional[ReleaseStage] = None
|
||||
supportLevel: Optional[SupportLevel] = None
|
||||
releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.")
|
||||
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
|
||||
protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector")
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
suggestedStreams: Optional[SuggestedStreams] = None
|
||||
maxSecondsBetweenMessages: Optional[int] = Field(
|
||||
None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach"
|
||||
)
|
||||
erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD")
|
||||
releases: Optional[ConnectorReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(
|
||||
None, description="The language the connector is written in"
|
||||
)
|
||||
class ConnectorRegistryV0(BaseModel):
|
||||
destinations: List[ConnectorRegistryDestinationDefinition]
|
||||
sources: List[ConnectorRegistrySourceDefinition]
|
||||
|
||||
|
||||
class ConnectorRegistryDestinationDefinition(BaseModel):
|
||||
@@ -278,16 +234,83 @@ class ConnectorRegistryDestinationDefinition(BaseModel):
|
||||
description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.",
|
||||
)
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
releases: Optional[ConnectorReleases] = None
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
supportsRefreshes: Optional[bool] = False
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(
|
||||
None, description="The language the connector is written in"
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
class ConnectorRegistryReleases(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
releaseCandidates: Optional[ConnectorReleaseCandidates] = None
|
||||
rolloutConfiguration: Optional[RolloutConfiguration] = None
|
||||
breakingChanges: Optional[ConnectorBreakingChanges] = None
|
||||
migrationDocumentationUrl: Optional[AnyUrl] = Field(
|
||||
None,
|
||||
description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations",
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistryV0(BaseModel):
|
||||
destinations: List[ConnectorRegistryDestinationDefinition]
|
||||
sources: List[ConnectorRegistrySourceDefinition]
|
||||
class ConnectorReleaseCandidates(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field(
|
||||
..., description="Each entry denotes a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class VersionReleaseCandidate(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field(
|
||||
..., description="Contains information about a release candidate version of a connector."
|
||||
)
|
||||
|
||||
|
||||
class ConnectorRegistrySourceDefinition(BaseModel):
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
sourceDefinitionId: UUID
|
||||
name: str
|
||||
dockerRepository: str
|
||||
dockerImageTag: str
|
||||
documentationUrl: str
|
||||
icon: Optional[str] = None
|
||||
iconUrl: Optional[str] = None
|
||||
sourceType: Optional[Literal["api", "file", "database", "custom"]] = None
|
||||
spec: Dict[str, Any]
|
||||
tombstone: Optional[bool] = Field(
|
||||
False, description="if false, the configuration is active. if true, then this configuration is permanently off."
|
||||
)
|
||||
public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces")
|
||||
custom: Optional[bool] = Field(False, description="whether this is a custom connector definition")
|
||||
releaseStage: Optional[ReleaseStage] = None
|
||||
supportLevel: Optional[SupportLevel] = None
|
||||
releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.")
|
||||
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
|
||||
protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector")
|
||||
allowedHosts: Optional[AllowedHosts] = None
|
||||
suggestedStreams: Optional[SuggestedStreams] = None
|
||||
maxSecondsBetweenMessages: Optional[int] = Field(
|
||||
None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach"
|
||||
)
|
||||
erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD")
|
||||
releases: Optional[ConnectorRegistryReleases] = None
|
||||
ab_internal: Optional[AirbyteInternal] = None
|
||||
generated: Optional[GeneratedFields] = None
|
||||
packageInfo: Optional[ConnectorPackageInfo] = None
|
||||
language: Optional[str] = Field(None, description="The language the connector is written in")
|
||||
|
||||
|
||||
ConnectorRegistryV0.update_forward_refs()
|
||||
ConnectorRegistryDestinationDefinition.update_forward_refs()
|
||||
ConnectorRegistryReleases.update_forward_refs()
|
||||
ConnectorReleaseCandidates.update_forward_refs()
|
||||
VersionReleaseCandidate.update_forward_refs()
|
||||
|
||||
@@ -58,7 +58,9 @@ class ConnectorBreakingChanges(BaseModel):
|
||||
extra = Extra.forbid
|
||||
|
||||
__root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field(
|
||||
..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade."
|
||||
...,
|
||||
description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.",
|
||||
title="ConnectorBreakingChanges",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -2,11 +2,13 @@
|
||||
from .ActorDefinitionResourceRequirements import *
|
||||
from .AirbyteInternal import *
|
||||
from .AllowedHosts import *
|
||||
from .ConnectorBreakingChanges import *
|
||||
from .ConnectorBuildOptions import *
|
||||
from .ConnectorMetadataDefinitionV0 import *
|
||||
from .ConnectorMetrics import *
|
||||
from .ConnectorPackageInfo import *
|
||||
from .ConnectorRegistryDestinationDefinition import *
|
||||
from .ConnectorRegistryReleases import *
|
||||
from .ConnectorRegistrySourceDefinition import *
|
||||
from .ConnectorRegistryV0 import *
|
||||
from .ConnectorReleases import *
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
---
|
||||
"$schema": http://json-schema.org/draft-07/schema#
|
||||
"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorBreakingChanges.yaml
|
||||
title: ConnectorBreakingChanges
|
||||
description: Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
minProperties: 1
|
||||
patternProperties:
|
||||
"^\\d+\\.\\d+\\.\\d+$":
|
||||
$ref: "#/definitions/VersionBreakingChange"
|
||||
definitions:
|
||||
VersionBreakingChange:
|
||||
description: Contains information about a breaking change, including the deadline to upgrade and a message detailing the change.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
required:
|
||||
- upgradeDeadline
|
||||
- message
|
||||
properties:
|
||||
upgradeDeadline:
|
||||
description: The deadline by which to upgrade before the breaking change takes effect.
|
||||
type: string
|
||||
format: date
|
||||
message:
|
||||
description: Descriptive message detailing the breaking change.
|
||||
type: string
|
||||
migrationDocumentationUrl:
|
||||
description: URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version}
|
||||
type: string
|
||||
format: uri
|
||||
scopedImpact:
|
||||
description: List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types.
|
||||
type: array
|
||||
minItems: 1
|
||||
items:
|
||||
$ref: "#/definitions/BreakingChangeScope"
|
||||
BreakingChangeScope:
|
||||
description: A scope that can be used to limit the impact of a breaking change.
|
||||
type: object
|
||||
oneOf:
|
||||
- $ref: "#/definitions/StreamBreakingChangeScope"
|
||||
StreamBreakingChangeScope:
|
||||
description: A scope that can be used to limit the impact of a breaking change to specific streams.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
required:
|
||||
- scopeType
|
||||
- impactedScopes
|
||||
properties:
|
||||
scopeType:
|
||||
type: const
|
||||
const: stream
|
||||
impactedScopes:
|
||||
description: List of streams that are impacted by the breaking change.
|
||||
type: array
|
||||
minItems: 1
|
||||
items:
|
||||
type: string
|
||||
@@ -69,7 +69,7 @@ properties:
|
||||
allowedHosts:
|
||||
"$ref": AllowedHosts.yaml
|
||||
releases:
|
||||
"$ref": ConnectorReleases.yaml
|
||||
"$ref": ConnectorRegistryReleases.yaml
|
||||
ab_internal:
|
||||
"$ref": AirbyteInternal.yaml
|
||||
supportsRefreshes:
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
---
|
||||
"$schema": http://json-schema.org/draft-07/schema#
|
||||
"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryReleases.yaml
|
||||
title: ConnectorRegistryReleases
|
||||
description: Contains information about different types of releases for a connector.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
properties:
|
||||
releaseCandidates:
|
||||
$ref: "#/definitions/ConnectorReleaseCandidates"
|
||||
rolloutConfiguration:
|
||||
$ref: RolloutConfiguration.yaml
|
||||
breakingChanges:
|
||||
$ref: ConnectorBreakingChanges.yaml
|
||||
migrationDocumentationUrl:
|
||||
description: URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations
|
||||
type: string
|
||||
format: uri
|
||||
definitions:
|
||||
ConnectorReleaseCandidates:
|
||||
description: Each entry denotes a release candidate version of a connector.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
minProperties: 1
|
||||
maxProperties: 1
|
||||
patternProperties:
|
||||
"^\\d+\\.\\d+\\.\\d+$":
|
||||
$ref: "#/definitions/VersionReleaseCandidate"
|
||||
VersionReleaseCandidate:
|
||||
description: Contains information about a release candidate version of a connector.
|
||||
additionalProperties: false
|
||||
type: object
|
||||
oneOf:
|
||||
- $ref: ConnectorRegistrySourceDefinition.yaml
|
||||
- $ref: ConnectorRegistryDestinationDefinition.yaml
|
||||
@@ -74,7 +74,7 @@ properties:
|
||||
type: string
|
||||
description: The URL where you can visualize the ERD
|
||||
releases:
|
||||
"$ref": ConnectorReleases.yaml
|
||||
"$ref": ConnectorRegistryReleases.yaml
|
||||
ab_internal:
|
||||
"$ref": AirbyteInternal.yaml
|
||||
generated:
|
||||
|
||||
@@ -15,64 +15,8 @@ properties:
|
||||
rolloutConfiguration:
|
||||
$ref: RolloutConfiguration.yaml
|
||||
breakingChanges:
|
||||
$ref: "#/definitions/ConnectorBreakingChanges"
|
||||
$ref: ConnectorBreakingChanges.yaml
|
||||
migrationDocumentationUrl:
|
||||
description: URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations
|
||||
type: string
|
||||
format: uri
|
||||
definitions:
|
||||
ConnectorBreakingChanges:
|
||||
description: Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
minProperties: 1
|
||||
patternProperties:
|
||||
"^\\d+\\.\\d+\\.\\d+$":
|
||||
$ref: "#/definitions/VersionBreakingChange"
|
||||
VersionBreakingChange:
|
||||
description: Contains information about a breaking change, including the deadline to upgrade and a message detailing the change.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
required:
|
||||
- upgradeDeadline
|
||||
- message
|
||||
properties:
|
||||
upgradeDeadline:
|
||||
description: The deadline by which to upgrade before the breaking change takes effect.
|
||||
type: string
|
||||
format: date
|
||||
message:
|
||||
description: Descriptive message detailing the breaking change.
|
||||
type: string
|
||||
migrationDocumentationUrl:
|
||||
description: URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version}
|
||||
type: string
|
||||
format: uri
|
||||
scopedImpact:
|
||||
description: List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types.
|
||||
type: array
|
||||
minItems: 1
|
||||
items:
|
||||
$ref: "#/definitions/BreakingChangeScope"
|
||||
BreakingChangeScope:
|
||||
description: A scope that can be used to limit the impact of a breaking change.
|
||||
type: object
|
||||
oneOf:
|
||||
- $ref: "#/definitions/StreamBreakingChangeScope"
|
||||
StreamBreakingChangeScope:
|
||||
description: A scope that can be used to limit the impact of a breaking change to specific streams.
|
||||
type: object
|
||||
additionalProperties: false
|
||||
required:
|
||||
- scopeType
|
||||
- impactedScopes
|
||||
properties:
|
||||
scopeType:
|
||||
type: const
|
||||
const: stream
|
||||
impactedScopes:
|
||||
description: List of streams that are impacted by the breaking change.
|
||||
type: array
|
||||
minItems: 1
|
||||
items:
|
||||
type: string
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "metadata-service"
|
||||
version = "0.13.1"
|
||||
version = "0.14.0"
|
||||
description = ""
|
||||
authors = ["Ben Church <ben@airbyte.io>"]
|
||||
readme = "README.md"
|
||||
|
||||
@@ -94,7 +94,7 @@ METADATA_RESOURCE_TREE = {
|
||||
),
|
||||
"latest_metadata_file_blobs": gcs_directory_blobs.configured(
|
||||
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/{METADATA_FILE_NAME}$"}
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
DATA_WAREHOUSE_RESOURCE_TREE = {
|
||||
@@ -125,6 +125,12 @@ REGISTRY_ENTRY_RESOURCE_TREE = {
|
||||
"latest_oss_registry_entries_file_blobs": gcs_directory_blobs.configured(
|
||||
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/oss.json$"}
|
||||
),
|
||||
"release_candidate_cloud_registry_entries_file_blobs": gcs_directory_blobs.configured(
|
||||
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*release_candidate/cloud.json$"}
|
||||
),
|
||||
"release_candidate_oss_registry_entries_file_blobs": gcs_directory_blobs.configured(
|
||||
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*release_candidate/oss.json$"}
|
||||
),
|
||||
}
|
||||
|
||||
CONNECTOR_TEST_REPORT_SENSOR_RESOURCE_TREE = {
|
||||
@@ -167,12 +173,26 @@ SENSORS = [
|
||||
gcs_blobs_resource_key="latest_oss_registry_entries_file_blobs",
|
||||
interval=60,
|
||||
),
|
||||
new_gcs_blobs_sensor(
|
||||
job=generate_oss_registry,
|
||||
resources_def=REGISTRY_ENTRY_RESOURCE_TREE,
|
||||
gcs_blobs_resource_key="release_candidate_oss_registry_entries_file_blobs",
|
||||
interval=60,
|
||||
allow_duplicate_runs=True,
|
||||
),
|
||||
new_gcs_blobs_sensor(
|
||||
job=generate_cloud_registry,
|
||||
resources_def=REGISTRY_ENTRY_RESOURCE_TREE,
|
||||
gcs_blobs_resource_key="latest_cloud_registry_entries_file_blobs",
|
||||
interval=60,
|
||||
),
|
||||
new_gcs_blobs_sensor(
|
||||
job=generate_cloud_registry,
|
||||
resources_def=REGISTRY_ENTRY_RESOURCE_TREE,
|
||||
gcs_blobs_resource_key="release_candidate_cloud_registry_entries_file_blobs",
|
||||
interval=60,
|
||||
allow_duplicate_runs=True,
|
||||
),
|
||||
new_gcs_blobs_sensor(
|
||||
job=generate_nightly_reports,
|
||||
resources_def=CONNECTOR_TEST_REPORT_SENSOR_RESOURCE_TREE,
|
||||
@@ -184,7 +204,7 @@ SENSORS = [
|
||||
SCHEDULES = [
|
||||
ScheduleDefinition(job=add_new_metadata_partitions, cron_schedule="*/2 * * * *", tags={"dagster/priority": HIGH_QUEUE_PRIORITY}),
|
||||
ScheduleDefinition(
|
||||
cron_schedule="0 1 * * *", # Daily at 1am US/Pacific
|
||||
cron_schedule="*/2 * * * *", # Every 2 minutes
|
||||
execution_timezone="US/Pacific",
|
||||
job=remove_stale_metadata_partitions,
|
||||
),
|
||||
|
||||
@@ -59,7 +59,7 @@ def _convert_json_to_metrics_dict(jsonl_string: str) -> dict:
|
||||
|
||||
@asset(required_resource_keys={"latest_metrics_gcs_blob"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def latest_connnector_metrics(context: OpExecutionContext) -> dict:
|
||||
def latest_connector_metrics(context: OpExecutionContext) -> dict:
|
||||
latest_metrics_gcs_blob = context.resources.latest_metrics_gcs_blob
|
||||
|
||||
latest_metrics_jsonl = _safe_read_gcs_file(latest_metrics_gcs_blob)
|
||||
|
||||
@@ -13,7 +13,7 @@ import yaml
|
||||
from dagster import OpExecutionContext, Output, asset
|
||||
from github import Repository
|
||||
from orchestrator.logging import sentry
|
||||
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition, PartialMetadataDefinition
|
||||
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition
|
||||
from orchestrator.ops.slack import send_slack_message
|
||||
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
|
||||
|
||||
@@ -102,27 +102,56 @@ def github_metadata_definitions(context):
|
||||
return Output(metadata_definitions, metadata={"preview": [md.json() for md in metadata_definitions]})
|
||||
|
||||
|
||||
def entry_is_younger_than_grace_period(entry: LatestMetadataEntry) -> bool:
|
||||
grace_period_marker = datetime.datetime.now(datetime.timezone.utc) - PUBLISH_GRACE_PERIOD
|
||||
entry_last_modified = datetime.datetime.strptime(entry.last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=datetime.timezone.utc)
|
||||
return entry_last_modified > grace_period_marker
|
||||
|
||||
|
||||
def entry_should_be_on_gcs(metadata_entry: LatestMetadataEntry) -> bool:
|
||||
"""For stale metadata detection, we only want to scan latest metadata files from our master branch that are expected to be on GCS.
|
||||
A metadata file should be on GCS, in the latest directory, if:
|
||||
- it is not archived
|
||||
- not a release candidate
|
||||
- has been published for more than the grace period (just to reduce false positives when publish pipeline and stale detection run concurrently).
|
||||
|
||||
Args:
|
||||
metadata_entry (LatestMetadataEntry): The metadata entry to check
|
||||
|
||||
Returns:
|
||||
bool: True if the metadata entry should be on GCS, False otherwise
|
||||
"""
|
||||
if metadata_entry.metadata_definition.data.supportLevel == "archived":
|
||||
return False
|
||||
if getattr(metadata_entry.metadata_definition.releases, "isReleaseCandidate", False):
|
||||
return False
|
||||
if entry_is_younger_than_grace_period(metadata_entry):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@asset(required_resource_keys={"slack"}, group_name=GROUP_NAME)
|
||||
def stale_gcs_latest_metadata_file(context, github_metadata_definitions: list, metadata_definitions: list) -> OutputDataFrame:
|
||||
def stale_gcs_latest_metadata_file(context, github_metadata_definitions: list, latest_metadata_entries: list) -> OutputDataFrame:
|
||||
"""
|
||||
Return a list of all metadata files in the github repo and denote whether they are stale or not.
|
||||
|
||||
Stale means that the file in the github repo is not in the latest metadata file blobs.
|
||||
"""
|
||||
|
||||
# TODO:
|
||||
# The logic here is not bulletproof. It can't find release candidate metadata which did not made their way to GCS.
|
||||
# We should improve this logic to be able to detect those cases as well.
|
||||
|
||||
latest_versions_on_gcs = {
|
||||
metadata_entry.metadata_definition.data.dockerRepository: metadata_entry.metadata_definition.data.dockerImageTag
|
||||
for metadata_entry in metadata_definitions
|
||||
for metadata_entry in latest_metadata_entries
|
||||
if metadata_entry.metadata_definition.data.supportLevel != "archived"
|
||||
}
|
||||
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
latest_versions_on_github = {
|
||||
metadata_entry.metadata_definition.data.dockerRepository: metadata_entry.metadata_definition.data.dockerImageTag
|
||||
for metadata_entry in github_metadata_definitions
|
||||
if metadata_entry.metadata_definition.data.supportLevel
|
||||
!= "archived" # We give a 2 hour grace period for the metadata to be updated
|
||||
and datetime.datetime.strptime(metadata_entry.last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=datetime.timezone.utc)
|
||||
< now - PUBLISH_GRACE_PERIOD
|
||||
if entry_should_be_on_gcs(metadata_entry)
|
||||
}
|
||||
|
||||
stale_connectors = []
|
||||
|
||||
@@ -103,16 +103,9 @@ def compute_registry_overrides(merged_df):
|
||||
return registries
|
||||
|
||||
|
||||
# ASSETS
|
||||
|
||||
|
||||
@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntry]:
|
||||
latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs
|
||||
|
||||
def get_metadata_entries(blob_resource) -> Output:
|
||||
metadata_entries = []
|
||||
for blob in latest_metadata_file_blobs:
|
||||
for blob in blob_resource:
|
||||
yaml_string = blob.download_as_string().decode("utf-8")
|
||||
metadata_dict = yaml.safe_load(yaml_string)
|
||||
metadata_def = MetadataDefinition.parse_obj(metadata_dict)
|
||||
@@ -137,4 +130,12 @@ def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntr
|
||||
)
|
||||
metadata_entries.append(metadata_entry)
|
||||
|
||||
return metadata_entries
|
||||
return Output(metadata_entries, metadata={"preview": [m.file_path for m in metadata_entries]})
|
||||
|
||||
|
||||
# ASSETS
|
||||
@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def latest_metadata_entries(context: OpExecutionContext) -> Output[List[LatestMetadataEntry]]:
|
||||
latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs
|
||||
return get_metadata_entries(latest_metadata_file_blobs)
|
||||
|
||||
@@ -2,21 +2,26 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import copy
|
||||
import json
|
||||
from typing import List
|
||||
from typing import List, Union
|
||||
|
||||
import sentry_sdk
|
||||
from dagster import MetadataValue, OpExecutionContext, Output, asset
|
||||
from dagster import AutoMaterializePolicy, MetadataValue, OpExecutionContext, Output, asset
|
||||
from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager
|
||||
from google.cloud import storage
|
||||
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
|
||||
from metadata_service.models.generated.ConnectorRegistrySourceDefinition import ConnectorRegistrySourceDefinition
|
||||
from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0
|
||||
from metadata_service.models.transform import to_json_sanitized_dict
|
||||
from orchestrator.assets.registry_entry import ConnectorTypePrimaryKey, ConnectorTypes, read_registry_entry_blob
|
||||
from orchestrator.logging import sentry
|
||||
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
|
||||
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition
|
||||
from orchestrator.utils.object_helpers import default_none_to_dict
|
||||
from pydash.objects import set_with
|
||||
|
||||
PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]
|
||||
|
||||
GROUP_NAME = "registry"
|
||||
|
||||
|
||||
@@ -79,13 +84,54 @@ def persist_registry_to_json(
|
||||
return file_handle
|
||||
|
||||
|
||||
@sentry_sdk.trace
|
||||
def apply_release_candidates(
|
||||
latest_registry_entry: dict,
|
||||
release_candidate_registry_entry: PolymorphicRegistryEntry,
|
||||
) -> dict:
|
||||
updated_registry_entry = copy.deepcopy(latest_registry_entry)
|
||||
updated_registry_entry.setdefault("releases", {})
|
||||
updated_registry_entry["releases"]["releaseCandidates"] = {
|
||||
release_candidate_registry_entry.dockerImageTag: to_json_sanitized_dict(release_candidate_registry_entry)
|
||||
}
|
||||
return updated_registry_entry
|
||||
|
||||
|
||||
def apply_release_candidate_entries(registry_entry_dict: dict, docker_repository_to_rc_registry_entry: dict) -> dict:
|
||||
"""Apply the optionally existing release candidate entries to the registry entry.
|
||||
We need both the release candidate metadata entry and the release candidate registry entry because the metadata entry contains the rollout configuration, and the registry entry contains the actual RC registry entry.
|
||||
|
||||
Args:
|
||||
registry_entry_dict (dict): The registry entry.
|
||||
docker_repository_to_rc_registry_entry (dict): Mapping of docker repository to release candidate registry entry.
|
||||
|
||||
Returns:
|
||||
dict: The registry entry with release candidates applied.
|
||||
"""
|
||||
registry_entry_dict = copy.deepcopy(registry_entry_dict)
|
||||
if registry_entry_dict["dockerRepository"] in docker_repository_to_rc_registry_entry:
|
||||
release_candidate_registry_entry = docker_repository_to_rc_registry_entry[registry_entry_dict["dockerRepository"]]
|
||||
registry_entry_dict = apply_release_candidates(registry_entry_dict, release_candidate_registry_entry)
|
||||
return registry_entry_dict
|
||||
|
||||
|
||||
def get_connector_type_from_registry_entry(registry_entry: PolymorphicRegistryEntry) -> ConnectorTypes:
|
||||
if hasattr(registry_entry, "sourceDefinitionId"):
|
||||
return ConnectorTypes.SOURCE
|
||||
elif hasattr(registry_entry, "destinationDefinitionId"):
|
||||
return ConnectorTypes.DESTINATION
|
||||
else:
|
||||
raise ValueError("Registry entry is not a source or destination")
|
||||
|
||||
|
||||
@sentry_sdk.trace
|
||||
def generate_and_persist_registry(
|
||||
context: OpExecutionContext,
|
||||
registry_entry_file_blobs: List[storage.Blob],
|
||||
latest_registry_entries: List,
|
||||
release_candidate_registry_entries: List,
|
||||
registry_directory_manager: GCSFileManager,
|
||||
registry_name: str,
|
||||
latest_connnector_metrics: dict,
|
||||
latest_connector_metrics: dict,
|
||||
) -> Output[ConnectorRegistryV0]:
|
||||
"""Generate the selected registry from the metadata files, and persist it to GCS.
|
||||
|
||||
@@ -104,14 +150,21 @@ def generate_and_persist_registry(
|
||||
)
|
||||
|
||||
registry_dict = {"sources": [], "destinations": []}
|
||||
for blob in registry_entry_file_blobs:
|
||||
connector_type, registry_entry = read_registry_entry_blob(blob)
|
||||
plural_connector_type = f"{connector_type}s"
|
||||
|
||||
# We santiize the registry entry to ensure its in a format
|
||||
docker_repository_to_rc_registry_entry = {
|
||||
release_candidate_registry_entries.dockerRepository: release_candidate_registry_entries
|
||||
for release_candidate_registry_entries in release_candidate_registry_entries
|
||||
}
|
||||
|
||||
for latest_registry_entry in latest_registry_entries:
|
||||
connector_type = get_connector_type_from_registry_entry(latest_registry_entry)
|
||||
plural_connector_type = f"{connector_type.value}s"
|
||||
|
||||
# We sanitize the registry entry to ensure its in a format
|
||||
# that can be parsed by pydantic.
|
||||
registry_entry_dict = to_json_sanitized_dict(registry_entry)
|
||||
enriched_registry_entry_dict = apply_metrics_to_registry_entry(registry_entry_dict, connector_type, latest_connnector_metrics)
|
||||
registry_entry_dict = to_json_sanitized_dict(latest_registry_entry)
|
||||
enriched_registry_entry_dict = apply_metrics_to_registry_entry(registry_entry_dict, connector_type, latest_connector_metrics)
|
||||
enriched_registry_entry_dict = apply_release_candidate_entries(enriched_registry_entry_dict, docker_repository_to_rc_registry_entry)
|
||||
|
||||
registry_dict[plural_connector_type].append(enriched_registry_entry_dict)
|
||||
|
||||
@@ -137,46 +190,66 @@ def generate_and_persist_registry(
|
||||
|
||||
|
||||
@asset(
|
||||
required_resource_keys={"slack", "registry_directory_manager", "latest_oss_registry_entries_file_blobs", "latest_metrics_gcs_blob"},
|
||||
required_resource_keys={
|
||||
"slack",
|
||||
"registry_directory_manager",
|
||||
"latest_oss_registry_entries_file_blobs",
|
||||
"release_candidate_oss_registry_entries_file_blobs",
|
||||
"latest_metrics_gcs_blob",
|
||||
},
|
||||
group_name=GROUP_NAME,
|
||||
)
|
||||
@sentry.instrument_asset_op
|
||||
def persisted_oss_registry(context: OpExecutionContext, latest_connnector_metrics: dict) -> Output[ConnectorRegistryV0]:
|
||||
def persisted_oss_registry(
|
||||
context: OpExecutionContext,
|
||||
latest_connector_metrics: dict,
|
||||
latest_oss_registry_entries: List,
|
||||
release_candidate_oss_registry_entries: List,
|
||||
) -> Output[ConnectorRegistryV0]:
|
||||
"""
|
||||
This asset is used to generate the oss registry from the registry entries.
|
||||
"""
|
||||
registry_name = "oss"
|
||||
registry_directory_manager = context.resources.registry_directory_manager
|
||||
latest_oss_registry_entries_file_blobs = context.resources.latest_oss_registry_entries_file_blobs
|
||||
|
||||
return generate_and_persist_registry(
|
||||
context=context,
|
||||
registry_entry_file_blobs=latest_oss_registry_entries_file_blobs,
|
||||
latest_registry_entries=latest_oss_registry_entries,
|
||||
release_candidate_registry_entries=release_candidate_oss_registry_entries,
|
||||
registry_directory_manager=registry_directory_manager,
|
||||
registry_name=registry_name,
|
||||
latest_connnector_metrics=latest_connnector_metrics,
|
||||
latest_connector_metrics=latest_connector_metrics,
|
||||
)
|
||||
|
||||
|
||||
@asset(
|
||||
required_resource_keys={"slack", "registry_directory_manager", "latest_cloud_registry_entries_file_blobs", "latest_metrics_gcs_blob"},
|
||||
required_resource_keys={
|
||||
"slack",
|
||||
"registry_directory_manager",
|
||||
"latest_cloud_registry_entries_file_blobs",
|
||||
"release_candidate_cloud_registry_entries_file_blobs",
|
||||
"latest_metrics_gcs_blob",
|
||||
},
|
||||
group_name=GROUP_NAME,
|
||||
)
|
||||
@sentry.instrument_asset_op
|
||||
def persisted_cloud_registry(context: OpExecutionContext, latest_connnector_metrics: dict) -> Output[ConnectorRegistryV0]:
|
||||
def persisted_cloud_registry(
|
||||
context: OpExecutionContext,
|
||||
latest_connector_metrics: dict,
|
||||
latest_cloud_registry_entries: List,
|
||||
release_candidate_cloud_registry_entries: List,
|
||||
) -> Output[ConnectorRegistryV0]:
|
||||
"""
|
||||
This asset is used to generate the cloud registry from the registry entries.
|
||||
"""
|
||||
registry_name = "cloud"
|
||||
registry_directory_manager = context.resources.registry_directory_manager
|
||||
latest_cloud_registry_entries_file_blobs = context.resources.latest_cloud_registry_entries_file_blobs
|
||||
|
||||
return generate_and_persist_registry(
|
||||
context=context,
|
||||
registry_entry_file_blobs=latest_cloud_registry_entries_file_blobs,
|
||||
latest_registry_entries=latest_cloud_registry_entries,
|
||||
release_candidate_registry_entries=release_candidate_cloud_registry_entries,
|
||||
registry_directory_manager=registry_directory_manager,
|
||||
registry_name=registry_name,
|
||||
latest_connnector_metrics=latest_connnector_metrics,
|
||||
latest_connector_metrics=latest_connector_metrics,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import List, Optional, Tuple, Union
|
||||
|
||||
import orchestrator.hacks as HACKS
|
||||
import pandas as pd
|
||||
import semver
|
||||
import sentry_sdk
|
||||
from dagster import AutoMaterializePolicy, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset
|
||||
from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager
|
||||
@@ -90,25 +91,29 @@ def calculate_migration_documentation_url(releases_or_breaking_change: dict, doc
|
||||
|
||||
|
||||
@deep_copy_params
|
||||
def apply_connector_release_defaults(metadata: dict) -> Optional[pd.DataFrame]:
|
||||
metadata_releases = metadata.get("releases")
|
||||
def apply_connector_releases(metadata: dict) -> Optional[pd.DataFrame]:
|
||||
documentation_url = metadata.get("documentationUrl")
|
||||
if metadata_releases is None:
|
||||
return None
|
||||
final_registry_releases = {}
|
||||
|
||||
# apply defaults for connector releases
|
||||
metadata_releases["migrationDocumentationUrl"] = calculate_migration_documentation_url(metadata_releases, documentation_url)
|
||||
if metadata.get("releases", {}).get("breakingChanges"):
|
||||
# apply defaults for connector releases
|
||||
final_registry_releases["migrationDocumentationUrl"] = calculate_migration_documentation_url(
|
||||
metadata["releases"], documentation_url
|
||||
)
|
||||
|
||||
# releases has a dictionary field called breakingChanges, where the key is the version and the value is the data for the breaking change
|
||||
# each breaking change has a migrationDocumentationUrl field that is optional, so we need to apply defaults to it
|
||||
breaking_changes = metadata_releases["breakingChanges"]
|
||||
if breaking_changes is not None:
|
||||
for version, breaking_change in breaking_changes.items():
|
||||
breaking_change["migrationDocumentationUrl"] = calculate_migration_documentation_url(
|
||||
breaking_change, documentation_url, version
|
||||
)
|
||||
# releases has a dictionary field called breakingChanges, where the key is the version and the value is the data for the breaking change
|
||||
# each breaking change has a migrationDocumentationUrl field that is optional, so we need to apply defaults to it
|
||||
breaking_changes = metadata["releases"]["breakingChanges"]
|
||||
if breaking_changes is not None:
|
||||
for version, breaking_change in breaking_changes.items():
|
||||
breaking_change["migrationDocumentationUrl"] = calculate_migration_documentation_url(
|
||||
breaking_change, documentation_url, version
|
||||
)
|
||||
final_registry_releases["breakingChanges"] = breaking_changes
|
||||
|
||||
return metadata_releases
|
||||
if metadata.get("releases", {}).get("rolloutConfiguration"):
|
||||
final_registry_releases["rolloutConfiguration"] = metadata["releases"]["rolloutConfiguration"]
|
||||
return final_registry_releases
|
||||
|
||||
|
||||
@deep_copy_params
|
||||
@@ -278,8 +283,7 @@ def metadata_to_registry_entry(metadata_entry: LatestMetadataEntry, override_reg
|
||||
|
||||
# apply generated fields
|
||||
overridden_metadata_data["iconUrl"] = metadata_entry.icon_url
|
||||
overridden_metadata_data["releases"] = apply_connector_release_defaults(overridden_metadata_data)
|
||||
|
||||
overridden_metadata_data["releases"] = apply_connector_releases(overridden_metadata_data)
|
||||
return overridden_metadata_data
|
||||
|
||||
|
||||
@@ -303,7 +307,7 @@ def get_connector_type_from_registry_entry(registry_entry: dict) -> TaggedRegist
|
||||
raise Exception("Could not determine connector type from registry entry")
|
||||
|
||||
|
||||
def _get_latest_entry_write_path(metadata_path: Optional[str], registry_name: str) -> str:
|
||||
def _get_directory_write_path(metadata_path: Optional[str], registry_name: str) -> str:
|
||||
"""Get the write path for the registry entry, assuming the metadata entry is the latest version."""
|
||||
if metadata_path is None:
|
||||
raise Exception(f"Metadata entry {metadata_entry} does not have a file path")
|
||||
@@ -316,9 +320,9 @@ def get_registry_entry_write_path(
|
||||
registry_entry: Optional[PolymorphicRegistryEntry], metadata_entry: LatestMetadataEntry, registry_name: str
|
||||
) -> str:
|
||||
"""Get the write path for the registry entry."""
|
||||
if metadata_entry.is_latest_version_path:
|
||||
# if the metadata entry is the latest version, write the registry entry to the same path as the metadata entry
|
||||
return _get_latest_entry_write_path(metadata_entry.file_path, registry_name)
|
||||
if metadata_entry.is_latest_version_path or metadata_entry.is_release_candidate_version_path:
|
||||
# if the metadata entry is the latest or RC version, write the registry entry to the same path as the metadata entry
|
||||
return _get_directory_write_path(metadata_entry.file_path, registry_name)
|
||||
else:
|
||||
if registry_entry is None:
|
||||
raise Exception(f"Could not determine write path for registry entry {registry_entry} because it is None")
|
||||
@@ -353,6 +357,30 @@ def persist_registry_entry_to_json(
|
||||
return file_handle
|
||||
|
||||
|
||||
def generate_registry_entry(
|
||||
metadata_entry: LatestMetadataEntry,
|
||||
spec_cache: SpecCache,
|
||||
registry_name: str,
|
||||
) -> PolymorphicRegistryEntry:
|
||||
"""Generate a registry entry given a metadata entry.
|
||||
Enriches the metadata entry with spec and release candidate information.
|
||||
|
||||
Args:
|
||||
metadata_entry (LatestMetadataEntry): The metadata entry.
|
||||
spec_cache (SpecCache): The spec cache.
|
||||
registry_name (str): The name of the registry_entry. One of "cloud" or "oss".
|
||||
|
||||
Returns:
|
||||
PolymorphicRegistryEntry: The registry entry (could be a source or destination entry).
|
||||
"""
|
||||
raw_entry_dict = metadata_to_registry_entry(metadata_entry, registry_name)
|
||||
registry_entry_with_spec = apply_spec_to_registry_entry(raw_entry_dict, spec_cache, registry_name)
|
||||
|
||||
_, ConnectorModel = get_connector_type_from_registry_entry(registry_entry_with_spec)
|
||||
|
||||
return ConnectorModel.parse_obj(registry_entry_with_spec)
|
||||
|
||||
|
||||
@sentry_sdk.trace
|
||||
def generate_and_persist_registry_entry(
|
||||
metadata_entry: LatestMetadataEntry,
|
||||
@@ -363,19 +391,14 @@ def generate_and_persist_registry_entry(
|
||||
"""Generate the selected registry from the metadata files, and persist it to GCS.
|
||||
|
||||
Args:
|
||||
context (OpExecutionContext): The execution context.
|
||||
metadata_entry (List[LatestMetadataEntry]): The metadata definitions.
|
||||
cached_specs (OutputDataFrame): The cached specs.
|
||||
|
||||
metadata_entry (List[LatestMetadataEntry]): The metadata entry.
|
||||
spec_cache (SpecCache): The spec cache.
|
||||
metadata_directory_manager (GCSFileManager): The metadata directory manager.
|
||||
registry_name (str): The name of the registry_entry. One of "cloud" or "oss".
|
||||
Returns:
|
||||
Output[ConnectorRegistryV0]: The registry.
|
||||
str: The public url of the registry entry.
|
||||
"""
|
||||
raw_entry_dict = metadata_to_registry_entry(metadata_entry, registry_name)
|
||||
registry_entry_with_spec = apply_spec_to_registry_entry(raw_entry_dict, spec_cache, registry_name)
|
||||
|
||||
_, ConnectorModel = get_connector_type_from_registry_entry(registry_entry_with_spec)
|
||||
|
||||
registry_model = ConnectorModel.parse_obj(registry_entry_with_spec)
|
||||
registry_model = generate_registry_entry(metadata_entry, spec_cache, registry_name)
|
||||
|
||||
file_handle = persist_registry_entry_to_json(registry_model, registry_name, metadata_entry, metadata_directory_manager)
|
||||
|
||||
@@ -585,7 +608,10 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
|
||||
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
|
||||
)
|
||||
@sentry.instrument_asset_op
|
||||
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
|
||||
def registry_entry(
|
||||
context: OpExecutionContext,
|
||||
metadata_entry: Optional[LatestMetadataEntry],
|
||||
) -> Output[Optional[dict]]:
|
||||
"""
|
||||
Generate the registry entry files from the given metadata file, and persist it to GCS.
|
||||
"""
|
||||
@@ -613,7 +639,12 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
|
||||
enabled_registries, disabled_registries = get_registry_status_lists(metadata_entry)
|
||||
|
||||
persisted_registry_entries = {
|
||||
registry_name: generate_and_persist_registry_entry(metadata_entry, spec_cache, root_metadata_directory_manager, registry_name)
|
||||
registry_name: generate_and_persist_registry_entry(
|
||||
metadata_entry,
|
||||
spec_cache,
|
||||
root_metadata_directory_manager,
|
||||
registry_name,
|
||||
)
|
||||
for registry_name in enabled_registries
|
||||
}
|
||||
|
||||
@@ -663,3 +694,36 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
|
||||
)
|
||||
|
||||
return Output(metadata=dagster_metadata, value=persisted_registry_entries)
|
||||
|
||||
|
||||
def get_registry_entries(blob_resource) -> Output[List]:
|
||||
registry_entries = []
|
||||
for blob in blob_resource:
|
||||
_, registry_entry = read_registry_entry_blob(blob)
|
||||
registry_entries.append(registry_entry)
|
||||
|
||||
return Output(registry_entries)
|
||||
|
||||
|
||||
@asset(required_resource_keys={"latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def latest_cloud_registry_entries(context: OpExecutionContext) -> Output[List]:
|
||||
return get_registry_entries(context.resources.latest_cloud_registry_entries_file_blobs)
|
||||
|
||||
|
||||
@asset(required_resource_keys={"latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def latest_oss_registry_entries(context: OpExecutionContext) -> Output[List]:
|
||||
return get_registry_entries(context.resources.latest_oss_registry_entries_file_blobs)
|
||||
|
||||
|
||||
@asset(required_resource_keys={"release_candidate_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def release_candidate_cloud_registry_entries(context: OpExecutionContext) -> Output[List]:
|
||||
return get_registry_entries(context.resources.release_candidate_cloud_registry_entries_file_blobs)
|
||||
|
||||
|
||||
@asset(required_resource_keys={"release_candidate_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
|
||||
@sentry.instrument_asset_op
|
||||
def release_candidate_oss_registry_entries(context: OpExecutionContext) -> Output[List]:
|
||||
return get_registry_entries(context.resources.release_candidate_oss_registry_entries_file_blobs)
|
||||
|
||||
@@ -3,13 +3,14 @@
|
||||
#
|
||||
|
||||
from dagster import AssetSelection, SkipReason, define_asset_job, job, op
|
||||
from orchestrator.assets import registry_entry
|
||||
from orchestrator.assets import metadata, registry, registry_entry, specs_secrets_mask
|
||||
from orchestrator.config import HIGH_QUEUE_PRIORITY, MAX_METADATA_PARTITION_RUN_REQUEST
|
||||
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
|
||||
|
||||
oss_registry_inclusive = AssetSelection.keys("persisted_oss_registry", "specs_secrets_mask_yaml").upstream()
|
||||
generate_oss_registry = define_asset_job(name="generate_oss_registry", selection=oss_registry_inclusive)
|
||||
|
||||
|
||||
cloud_registry_inclusive = AssetSelection.keys("persisted_cloud_registry", "specs_secrets_mask_yaml").upstream()
|
||||
generate_cloud_registry = define_asset_job(name="generate_cloud_registry", selection=cloud_registry_inclusive)
|
||||
|
||||
|
||||
@@ -65,6 +65,14 @@ class LatestMetadataEntry(BaseModel):
|
||||
ending_path = f"latest/{METADATA_FILE_NAME}"
|
||||
return self.file_path.endswith(ending_path)
|
||||
|
||||
@property
|
||||
def is_release_candidate_version_path(self) -> bool:
|
||||
"""
|
||||
Path is considered a latest version path if the subfolder containing METADATA_FILE_NAME is "latest"
|
||||
"""
|
||||
ending_path = f"release_candidate/{METADATA_FILE_NAME}"
|
||||
return self.file_path.endswith(ending_path)
|
||||
|
||||
@property
|
||||
def dependency_file_url(self) -> Optional[str]:
|
||||
if not self.bucket_name or not self.metadata_definition:
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import time
|
||||
|
||||
from dagster import (
|
||||
DefaultSensorStatus,
|
||||
RunRequest,
|
||||
@@ -21,6 +23,7 @@ def new_gcs_blobs_sensor(
|
||||
job,
|
||||
interval,
|
||||
resources_def,
|
||||
allow_duplicate_runs=False,
|
||||
) -> SensorDefinition:
|
||||
"""
|
||||
This sensor is responsible for polling a list of gcs blobs and triggering a job when the list changes.
|
||||
@@ -42,7 +45,6 @@ def new_gcs_blobs_sensor(
|
||||
context.log.info(f"Old etag cursor: {context.cursor}")
|
||||
|
||||
gcs_blobs_resource = getattr(resources, gcs_blobs_resource_key)
|
||||
|
||||
new_etags_cursor = string_array_to_hash([blob.etag for blob in gcs_blobs_resource])
|
||||
context.log.info(f"New etag cursor: {new_etags_cursor}")
|
||||
|
||||
@@ -54,6 +56,11 @@ def new_gcs_blobs_sensor(
|
||||
context.update_cursor(new_etags_cursor)
|
||||
context.log.info(f"New {gcs_blobs_resource_key} in GCS bucket")
|
||||
run_key = f"{sensor_name}:{new_etags_cursor}"
|
||||
# Dagster skips runs with the same run_key
|
||||
# It means that if the GCS blob list changed back to a state which was already processed, the run will be skipped
|
||||
# This is not desirable in cases we want to reprocess the same data again after a blob deletion
|
||||
if allow_duplicate_runs:
|
||||
run_key += f":{int(time.time())}"
|
||||
return RunRequest(run_key=run_key)
|
||||
|
||||
return new_gcs_blobs_sensor_definition
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "alembic"
|
||||
@@ -1747,7 +1747,7 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "metadata-service"
|
||||
version = "0.13.0"
|
||||
version = "0.14.0"
|
||||
description = ""
|
||||
optional = false
|
||||
python-versions = "^3.9"
|
||||
@@ -2514,6 +2514,7 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"},
|
||||
@@ -2521,8 +2522,15 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"},
|
||||
{file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"},
|
||||
{file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"},
|
||||
{file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"},
|
||||
@@ -2539,6 +2547,7 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"},
|
||||
@@ -2546,6 +2555,7 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"},
|
||||
{file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"},
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "orchestrator"
|
||||
version = "0.4.1"
|
||||
version = "0.5.0"
|
||||
description = ""
|
||||
authors = ["Ben Church <ben@airbyte.io>"]
|
||||
readme = "README.md"
|
||||
|
||||
Reference in New Issue
Block a user