Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
72 lines
2.9 KiB
Python
72 lines
2.9 KiB
Python
#
|
|
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
#
|
|
from dataclasses import dataclass
|
|
from typing import Any, Iterable, Tuple
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
|
|
from airbyte_cdk.sources.declarative.types import StreamSlice
|
|
from airbyte_cdk.sources.declarative.validators import ValidationStrategy
|
|
from airbyte_cdk.utils import is_cloud_environment
|
|
|
|
|
|
@dataclass
|
|
class ValidateApiUrl(ValidationStrategy):
|
|
def validate(self, value: Any) -> None:
|
|
url: str = value
|
|
is_valid, scheme, _ = self._parse_url(url=url)
|
|
if not is_valid:
|
|
raise ValueError("Invalid API resource locator.")
|
|
if scheme == "http" and is_cloud_environment():
|
|
raise ValueError("Http scheme is not allowed in this environment. Please use `https` instead.")
|
|
|
|
@staticmethod
|
|
def _parse_url(url: str) -> Tuple[bool, str, str]:
|
|
if not "." in url:
|
|
return False, "", ""
|
|
parts = url.split("://")
|
|
if len(parts) > 1:
|
|
scheme, url = parts
|
|
else:
|
|
scheme = "https"
|
|
if scheme not in ("http", "https"):
|
|
return False, "", ""
|
|
parts = url.split("/", 1)
|
|
if len(parts) > 1:
|
|
return False, "", ""
|
|
host, *_ = parts
|
|
return True, scheme, host
|
|
|
|
|
|
@dataclass
|
|
class GroupStreamsPartitionRouter(SubstreamPartitionRouter):
|
|
def stream_slices(self) -> Iterable[StreamSlice]:
|
|
parent_streams = {stream.stream.name: stream.stream for stream in self.parent_stream_configs}
|
|
groups_list = self.config.get("groups_list")
|
|
selected_parent = parent_streams["include_descendant_groups"] if groups_list else parent_streams["groups_list"]
|
|
|
|
for partition in selected_parent.generate_partitions():
|
|
for record in partition.read():
|
|
yield StreamSlice(partition={"id": record["id"]}, cursor_slice={})
|
|
|
|
|
|
@dataclass
|
|
class ProjectStreamsPartitionRouter(SubstreamPartitionRouter):
|
|
def stream_slices(self) -> Iterable[StreamSlice]:
|
|
parent_stream = self.parent_stream_configs[0].stream
|
|
projects_list = self.config.get("projects_list", [])
|
|
|
|
group_project_ids = []
|
|
for partition in parent_stream.generate_partitions():
|
|
for record in partition.read():
|
|
group_project_ids.extend([i["path_with_namespace"] for i in record["projects"]])
|
|
|
|
if group_project_ids:
|
|
for project_id in group_project_ids:
|
|
if not projects_list or projects_list and project_id in projects_list:
|
|
yield StreamSlice(partition={"id": project_id.replace("/", "%2F")}, cursor_slice={})
|
|
else:
|
|
for project_id in projects_list:
|
|
yield StreamSlice(partition={"id": project_id.replace("/", "%2F")}, cursor_slice={})
|