Source Github: Support syncing multiple repositories/organizations (#5136)
Support syncing multiple repositories/organizations Co-authored-by: ykurochkin <y.kurochkin@zazmic.com>
This commit is contained in:
@@ -45,6 +45,7 @@ from .streams import (
|
||||
Projects,
|
||||
PullRequests,
|
||||
Releases,
|
||||
Repositories,
|
||||
Reviews,
|
||||
Stargazers,
|
||||
Teams,
|
||||
@@ -52,18 +53,42 @@ from .streams import (
|
||||
|
||||
|
||||
class SourceGithub(AbstractSource):
|
||||
def _generate_repositories(self, config: Mapping[str, Any], authenticator: TokenAuthenticator) -> List[str]:
|
||||
organizations = list(filter(None, config["organization"].split(" ")))
|
||||
repositories = list(filter(None, config["repository"].split(" ")))
|
||||
|
||||
if not (organizations or repositories):
|
||||
raise Exception("Either `organisation` or `repository` need to be provided for connect to Github API")
|
||||
|
||||
repositories_list = []
|
||||
if organizations:
|
||||
repos = Repositories(authenticator=authenticator, organizations=organizations)
|
||||
for stream in repos.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
repositories_list += [
|
||||
repository["full_name"] for repository in repos.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream)
|
||||
]
|
||||
if repositories:
|
||||
repositories_list += repositories
|
||||
|
||||
return list(set(repositories_list))
|
||||
|
||||
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
|
||||
try:
|
||||
authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token")
|
||||
commits_stream = Commits(authenticator=authenticator, repository=config["repository"], start_date=config["start_date"])
|
||||
next(commits_stream.read_records(sync_mode=SyncMode.full_refresh))
|
||||
repositories = self._generate_repositories(config=config, authenticator=authenticator)
|
||||
|
||||
# We should use the most poorly filled stream to use the `list` method, because when using the `next` method, we can get the `StopIteration` error.
|
||||
projects_stream = Projects(authenticator=authenticator, repositories=repositories, start_date=config["start_date"])
|
||||
for stream in projects_stream.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
list(projects_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream))
|
||||
return True, None
|
||||
except Exception as e:
|
||||
return False, repr(e)
|
||||
|
||||
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
||||
authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token")
|
||||
full_refresh_args = {"authenticator": authenticator, "repository": config["repository"]}
|
||||
repositories = self._generate_repositories(config=config, authenticator=authenticator)
|
||||
full_refresh_args = {"authenticator": authenticator, "repositories": repositories}
|
||||
incremental_args = {**full_refresh_args, "start_date": config["start_date"]}
|
||||
|
||||
return [
|
||||
|
||||
Reference in New Issue
Block a user