1
0
mirror of synced 2025-12-19 18:14:56 -05:00
Files
Maxime Carbonneau-Leclerc 4c0aecd01b chore: gitlab cdk 7 migration (#65997)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
2025-09-10 14:39:15 -04:00

72 lines
2.9 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, Iterable, Tuple
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.types import StreamSlice
from airbyte_cdk.sources.declarative.validators import ValidationStrategy
from airbyte_cdk.utils import is_cloud_environment
@dataclass
class ValidateApiUrl(ValidationStrategy):
def validate(self, value: Any) -> None:
url: str = value
is_valid, scheme, _ = self._parse_url(url=url)
if not is_valid:
raise ValueError("Invalid API resource locator.")
if scheme == "http" and is_cloud_environment():
raise ValueError("Http scheme is not allowed in this environment. Please use `https` instead.")
@staticmethod
def _parse_url(url: str) -> Tuple[bool, str, str]:
if not "." in url:
return False, "", ""
parts = url.split("://")
if len(parts) > 1:
scheme, url = parts
else:
scheme = "https"
if scheme not in ("http", "https"):
return False, "", ""
parts = url.split("/", 1)
if len(parts) > 1:
return False, "", ""
host, *_ = parts
return True, scheme, host
@dataclass
class GroupStreamsPartitionRouter(SubstreamPartitionRouter):
def stream_slices(self) -> Iterable[StreamSlice]:
parent_streams = {stream.stream.name: stream.stream for stream in self.parent_stream_configs}
groups_list = self.config.get("groups_list")
selected_parent = parent_streams["include_descendant_groups"] if groups_list else parent_streams["groups_list"]
for partition in selected_parent.generate_partitions():
for record in partition.read():
yield StreamSlice(partition={"id": record["id"]}, cursor_slice={})
@dataclass
class ProjectStreamsPartitionRouter(SubstreamPartitionRouter):
def stream_slices(self) -> Iterable[StreamSlice]:
parent_stream = self.parent_stream_configs[0].stream
projects_list = self.config.get("projects_list", [])
group_project_ids = []
for partition in parent_stream.generate_partitions():
for record in partition.read():
group_project_ids.extend([i["path_with_namespace"] for i in record["projects"]])
if group_project_ids:
for project_id in group_project_ids:
if not projects_list or projects_list and project_id in projects_list:
yield StreamSlice(partition={"id": project_id.replace("/", "%2F")}, cursor_slice={})
else:
for project_id in projects_list:
yield StreamSlice(partition={"id": project_id.replace("/", "%2F")}, cursor_slice={})