Source Gitlab: fetch groups along with subgroups (#20384)
* #11248 Source Gitlab: fetch groups along with subgroups * #11248 source gitlab: upd changelog * #11248 source gitlab: fix build * #11248 source gitlab: fix build * auto-bump connector version Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -3,9 +3,8 @@
|
||||
#
|
||||
|
||||
|
||||
from typing import Any, List, Mapping, Tuple
|
||||
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union
|
||||
|
||||
import requests
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources import AbstractSource
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
@@ -23,6 +22,8 @@ from .streams import (
|
||||
GroupMilestones,
|
||||
GroupProjects,
|
||||
Groups,
|
||||
GroupsList,
|
||||
IncludeDescendantGroups,
|
||||
Issues,
|
||||
Jobs,
|
||||
MergeRequestCommits,
|
||||
@@ -40,61 +41,65 @@ from .streams import (
|
||||
|
||||
|
||||
class SourceGitlab(AbstractSource):
|
||||
def _generate_main_streams(self, config: Mapping[str, Any]) -> Tuple[GitlabStream, GitlabStream]:
|
||||
auth = TokenAuthenticator(token=config["private_token"])
|
||||
auth_params = dict(authenticator=auth, api_url=config["api_url"])
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.__auth_params: Mapping[str, Any] = {}
|
||||
self.__groups_stream: Optional[GitlabStream] = None
|
||||
self.__projects_stream: Optional[GitlabStream] = None
|
||||
|
||||
pids = list(filter(None, config.get("projects", "").split(" ")))
|
||||
gids = config.get("groups")
|
||||
def _groups_stream(self, config: MutableMapping[str, Any]) -> Groups:
|
||||
if not self.__groups_stream:
|
||||
auth_params = self._auth_params(config)
|
||||
group_ids = list(map(lambda x: x["id"], self._get_group_list(config)))
|
||||
self.__groups_stream = Groups(group_ids=group_ids, **auth_params)
|
||||
return self.__groups_stream
|
||||
|
||||
if gids:
|
||||
gids = list(filter(None, gids.split(" ")))
|
||||
else:
|
||||
gids = self._get_group_list(**auth_params)
|
||||
def _projects_stream(self, config: MutableMapping[str, Any]) -> Union[Projects, GroupProjects]:
|
||||
if not self.__projects_stream:
|
||||
auth_params = self._auth_params(config)
|
||||
project_ids = list(filter(None, config.get("projects", "").split(" ")))
|
||||
groups_stream = self._groups_stream(config)
|
||||
if groups_stream.group_ids:
|
||||
self.__projects_stream = GroupProjects(project_ids=project_ids, parent_stream=groups_stream, **auth_params)
|
||||
return self.__projects_stream
|
||||
self.__projects_stream = Projects(project_ids=project_ids, **auth_params)
|
||||
return self.__projects_stream
|
||||
|
||||
groups = Groups(group_ids=gids, **auth_params)
|
||||
if gids:
|
||||
projects = GroupProjects(project_ids=pids, parent_stream=groups, **auth_params)
|
||||
else:
|
||||
projects = Projects(project_ids=pids, **auth_params)
|
||||
def _auth_params(self, config: MutableMapping[str, Any]) -> Mapping[str, Any]:
|
||||
if not self.__auth_params:
|
||||
auth = TokenAuthenticator(token=config["private_token"])
|
||||
self.__auth_params = dict(authenticator=auth, api_url=config["api_url"])
|
||||
return self.__auth_params
|
||||
|
||||
return groups, projects
|
||||
|
||||
def _get_group_list(self, **kwargs):
|
||||
headers = kwargs["authenticator"].get_auth_header()
|
||||
|
||||
ids = []
|
||||
has_next = True
|
||||
# First request params
|
||||
per_page = 50
|
||||
next_page = 1
|
||||
|
||||
while has_next:
|
||||
response = requests.get(f'https://{kwargs["api_url"]}/api/v4/groups?page={next_page}&per_page={per_page}', headers=headers)
|
||||
next_page = response.headers.get("X-Next-Page")
|
||||
per_page = response.headers.get("X-Per-Page")
|
||||
results = response.json()
|
||||
|
||||
items = map(lambda i: i["full_path"].replace("/", "%2f"), results)
|
||||
ids.extend(items)
|
||||
has_next = "X-Next-Page" in response.headers and response.headers["X-Next-Page"] != ""
|
||||
|
||||
return ids
|
||||
def _get_group_list(self, config: MutableMapping[str, Any]) -> List[str]:
|
||||
group_ids = list(filter(None, config.get("groups", "").split(" ")))
|
||||
# Gitlab exposes different APIs to get a list of groups.
|
||||
# We use https://docs.gitlab.com/ee/api/groups.html#list-groups in case there's no group IDs in the input config.
|
||||
# This API provides full information about all available groups, including subgroups.
|
||||
#
|
||||
# In case there is a definitive list of groups IDs in the input config, the above API can not be used since
|
||||
# it does not support filtering by group ID, so we use
|
||||
# https://docs.gitlab.com/ee/api/groups.html#details-of-a-group and
|
||||
# https: //docs.gitlab.com/ee/api/groups.html#list-a-groups-descendant-groups for each group ID. The latter one does not
|
||||
# provide full group info so can only be used to retrieve alist of group IDs and pass it further to init a corresponding stream.
|
||||
auth_params = self._auth_params(config)
|
||||
stream = GroupsList(**auth_params) if not group_ids else IncludeDescendantGroups(group_ids=group_ids, **auth_params)
|
||||
for stream_slice in stream.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
yield from stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
|
||||
|
||||
def check_connection(self, logger, config) -> Tuple[bool, any]:
|
||||
try:
|
||||
groups, projects = self._generate_main_streams(config)
|
||||
for stream in projects.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
next(projects.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream))
|
||||
return True, None
|
||||
projects = self._projects_stream(config)
|
||||
for stream_slice in projects.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
next(projects.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
|
||||
return True, None
|
||||
except Exception as error:
|
||||
return False, f"Unable to connect to Gitlab API with the provided credentials - {repr(error)}"
|
||||
|
||||
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
||||
auth = TokenAuthenticator(token=config["private_token"])
|
||||
auth_params = dict(authenticator=auth, api_url=config["api_url"])
|
||||
def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
|
||||
auth_params = self._auth_params(config)
|
||||
|
||||
groups, projects = self._generate_main_streams(config)
|
||||
groups, projects = self._groups_stream(config), self._projects_stream(config)
|
||||
pipelines = Pipelines(parent_stream=projects, start_date=config["start_date"], **auth_params)
|
||||
merge_requests = MergeRequests(parent_stream=projects, start_date=config["start_date"], **auth_params)
|
||||
epics = Epics(parent_stream=groups, **auth_params)
|
||||
|
||||
Reference in New Issue
Block a user