1
0
mirror of synced 2025-12-21 19:11:14 -05:00

🎉 Source Github: use GraphQL for reviews stream (#13989)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
This commit is contained in:
Serhii Chvaliuk
2022-06-28 18:35:12 +03:00
committed by GitHub
parent 26a33dff3c
commit 4072d46a7c
17 changed files with 639 additions and 162 deletions

View File

@@ -4,17 +4,16 @@
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib import parse
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http import HttpStream
from requests.exceptions import HTTPError
from .graphql import get_query
from .graphql import get_query_pull_requests, get_query_reviews
from .utils import getter
DEFAULT_PAGE_SIZE = 100
@@ -449,7 +448,6 @@ class PullRequests(SemiIncrementalMixin, GithubStream):
use_cache = True
large_stream = True
first_read_override_key = "first_read_override"
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -459,7 +457,7 @@ class PullRequests(SemiIncrementalMixin, GithubStream):
"""
Decide if this a first read or not by the presence of the state object
"""
self._first_read = not bool(stream_state) or stream_state.get(self.first_read_override_key, False)
self._first_read = not bool(stream_state)
yield from super().read_records(stream_state=stream_state, **kwargs)
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
@@ -691,45 +689,6 @@ class ReviewComments(IncrementalMixin, GithubStream):
return f"repos/{stream_slice['repository']}/pulls/comments"
# Pull request substreams
class PullRequestSubstream(HttpSubStream, SemiIncrementalMixin, GithubStream, ABC):
def __init__(self, parent: PullRequests, **kwargs):
super().__init__(parent=parent, **kwargs)
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Override the parent PullRequests stream configuration to always fetch records in ascending order
"""
parent_state = deepcopy(stream_state) or {}
parent_state[PullRequests.first_read_override_key] = True
parent_stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=parent_state)
for parent_stream_slice in parent_stream_slices:
yield {
"pull_request_updated_at": parent_stream_slice["parent"]["updated_at"],
"pull_request_number": parent_stream_slice["parent"]["number"],
"repository": parent_stream_slice["parent"]["repository"],
}
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
We've already determined the list of pull requests to run the stream against.
Skip the start_point_map and cursor_field logic in SemiIncrementalMixin.read_records.
"""
yield from super(SemiIncrementalMixin, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)
class PullRequestStats(SemiIncrementalMixin, GithubStream):
"""
API docs: https://docs.github.com/en/graphql/reference/objects#pullrequest
@@ -743,21 +702,33 @@ class PullRequestStats(SemiIncrementalMixin, GithubStream):
) -> str:
return "graphql"
def raise_error_from_response(self, response_json):
if "errors" in response_json:
raise Exception(str(response_json["errors"]))
def _get_name(self, repository):
return repository["owner"]["login"] + "/" + repository["name"]
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
nodes = response.json()["data"]["repository"]["pullRequests"]["nodes"]
for record in nodes:
record["review_comments"] = sum([node["comments"]["totalCount"] for node in record["review_comments"]["nodes"]])
record["comments"] = record["comments"]["totalCount"]
record["commits"] = record["commits"]["totalCount"]
record["repository"] = record["repository"]["name"]
if record["merged_by"]:
record["merged_by"]["type"] = record["merged_by"].pop("__typename")
yield record
self.raise_error_from_response(response_json=response.json())
repository = response.json()["data"]["repository"]
if repository:
nodes = repository["pullRequests"]["nodes"]
for record in nodes:
record["review_comments"] = sum([node["comments"]["totalCount"] for node in record["review_comments"]["nodes"]])
record["comments"] = record["comments"]["totalCount"]
record["commits"] = record["commits"]["totalCount"]
record["repository"] = self._get_name(repository)
if record["merged_by"]:
record["merged_by"]["type"] = record["merged_by"].pop("__typename")
yield record
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
pageInfo = response.json()["data"]["repository"]["pullRequests"]["pageInfo"]
if pageInfo["hasNextPage"]:
return pageInfo["endCursor"]
repository = response.json()["data"]["repository"]
if repository:
pageInfo = repository["pullRequests"]["pageInfo"]
if pageInfo["hasNextPage"]:
return {"after": pageInfo["endCursor"]}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
@@ -771,7 +742,11 @@ class PullRequestStats(SemiIncrementalMixin, GithubStream):
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
organization, name = stream_slice["repository"].split("/")
query = get_query(owner=organization, name=name, page_size=self.page_size, next_page_token=next_page_token)
if next_page_token:
next_page_token = next_page_token["after"]
query = get_query_pull_requests(
owner=organization, name=name, first=self.page_size, after=next_page_token, direction=self.is_sorted.upper()
)
return {"query": query}
def request_headers(self, **kwargs) -> Mapping[str, Any]:
@@ -781,30 +756,96 @@ class PullRequestStats(SemiIncrementalMixin, GithubStream):
return {**base_headers, **headers}
class Reviews(PullRequestSubstream):
class Reviews(SemiIncrementalMixin, GithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request
API docs: https://docs.github.com/en/graphql/reference/objects#pullrequestreview
"""
cursor_field = "pull_request_updated_at"
is_sorted = False
http_method = "POST"
cursor_field = "updated_at"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.pull_requests_cursor = {}
self.reviews_cursors = {}
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}/reviews"
return "graphql"
# Set the parent stream state's cursor field before fetching its records
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
parent_state = deepcopy(stream_state) or {}
for repository in self.repositories:
if repository in parent_state and self.cursor_field in parent_state[repository]:
parent_state[repository][self.parent.cursor_field] = parent_state[repository][self.cursor_field]
yield from super().stream_slices(stream_state=parent_state, **kwargs)
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {}
def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = super().transform(record=record, stream_slice=stream_slice)
record[self.cursor_field] = stream_slice[self.cursor_field]
return record
def raise_error_from_response(self, response_json):
if "errors" in response_json:
raise Exception(str(response_json["errors"]))
def _get_records(self, pull_request, repository_name):
"yield review records from pull_request"
for record in pull_request["reviews"]["nodes"]:
record["repository"] = repository_name
record["pull_request_url"] = pull_request["url"]
if record["commit"]:
record["commit_id"] = record.pop("commit")["oid"]
record["user"]["type"] = record["user"].pop("__typename")
# for backward compatibility with REST API response
record["_links"] = {
"html": {"href": record["html_url"]},
"pull_request": {"href": record["pull_request_url"]},
}
yield record
def _get_name(self, repository):
return repository["owner"]["login"] + "/" + repository["name"]
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
self.raise_error_from_response(response_json=response.json())
repository = response.json()["data"]["repository"]
if repository:
repository_name = self._get_name(repository)
if "pullRequests" in repository:
for pull_request in repository["pullRequests"]["nodes"]:
yield from self._get_records(pull_request, repository_name)
elif "pullRequest" in repository:
yield from self._get_records(repository["pullRequest"], repository_name)
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
repository = response.json()["data"]["repository"]
if repository:
repository_name = self._get_name(repository)
reviews_cursors = self.reviews_cursors.setdefault(repository_name, {})
if "pullRequests" in repository:
if repository["pullRequests"]["pageInfo"]["hasNextPage"]:
self.pull_requests_cursor[repository_name] = repository["pullRequests"]["pageInfo"]["endCursor"]
for pull_request in repository["pullRequests"]["nodes"]:
if pull_request["reviews"]["pageInfo"]["hasNextPage"]:
pull_request_number = pull_request["number"]
reviews_cursors[pull_request_number] = pull_request["reviews"]["pageInfo"]["endCursor"]
elif "pullRequest" in repository:
if repository["pullRequest"]["reviews"]["pageInfo"]["hasNextPage"]:
pull_request_number = repository["pullRequest"]["number"]
reviews_cursors[pull_request_number] = repository["pullRequest"]["reviews"]["pageInfo"]["endCursor"]
if reviews_cursors:
number, after = reviews_cursors.popitem()
return {"after": after, "number": number}
if repository_name in self.pull_requests_cursor:
return {"after": self.pull_requests_cursor.pop(repository_name)}
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
organization, name = stream_slice["repository"].split("/")
if not next_page_token:
next_page_token = {"after": None}
query = get_query_reviews(owner=organization, name=name, first=self.page_size, **next_page_token)
return {"query": query}
class PullRequestCommits(GithubStream):