1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🐛 Source Github: test coverage more than 90% (#10967)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
This commit is contained in:
Serhii Chvaliuk
2022-03-19 13:14:53 +02:00
committed by GitHub
parent 298551d501
commit 0f475ce6ff
7 changed files with 718 additions and 52 deletions

View File

@@ -15,7 +15,7 @@
"name": "branches",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]]
"source_defined_primary_key": [["repository"], ["name"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
@@ -336,7 +336,7 @@
"name": "tags",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]]
"source_defined_primary_key": [["repository"], ["name"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"

View File

@@ -10,7 +10,7 @@ MAIN_REQUIREMENTS = [
"vcrpy==4.1.1",
]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "responses==0.13.3"]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "responses~=0.19.0"]
setup(
name="source_github",

View File

@@ -5,7 +5,6 @@
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from time import sleep
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib import parse
@@ -72,7 +71,7 @@ class GithubStream(HttpStream, ABC):
elif response.headers.get("Retry-After"):
time_delay = int(response.headers["Retry-After"])
self.logger.info(f"Handling Secondary Rate limits, setting sync delay for {time_delay} second(s)")
sleep(time_delay)
time.sleep(time_delay)
return retry_flag
def backoff_time(self, response: requests.Response) -> Union[int, float]:
@@ -215,13 +214,13 @@ class SemiIncrementalGithubStream(GithubStream):
current_stream_state[current_repository] = {self.cursor_field: state_value}
return current_stream_state
def get_starting_point(self, stream_state: Mapping[str, Any], repository: str) -> str:
start_point = self._start_date
if stream_state and stream_state.get(repository, {}).get(self.cursor_field):
start_point = max(start_point, stream_state[repository][self.cursor_field])
return start_point
def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
if stream_state:
repository = stream_slice["repository"]
stream_state_value = stream_state.get(repository, {}).get(self.cursor_field)
if stream_state_value:
return max(self._start_date, stream_state_value)
return self._start_date
def read_records(
self,
@@ -230,7 +229,7 @@ class SemiIncrementalGithubStream(GithubStream):
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
start_point = self.get_starting_point(stream_state=stream_state, repository=stream_slice["repository"])
start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
for record in super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
@@ -243,7 +242,7 @@ class SemiIncrementalGithubStream(GithubStream):
class IncrementalGithubStream(SemiIncrementalGithubStream):
def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, **kwargs)
since_params = self.get_starting_point(stream_state=stream_state, repository=stream_slice["repository"])
since_params = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
if since_params:
params["since"] = since_params
return params
@@ -276,7 +275,7 @@ class Branches(GithubStream):
API docs: https://docs.github.com/en/rest/reference/repos#list-branches
"""
primary_key = None
primary_key = ["repository", "name"]
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/branches"
@@ -342,7 +341,7 @@ class Tags(GithubStream):
API docs: https://docs.github.com/en/rest/reference/repos#list-repository-tags
"""
primary_key = None
primary_key = ["repository", "name"]
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/tags"
@@ -561,9 +560,7 @@ class Commits(IncrementalGithubStream):
def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super(IncrementalGithubStream, self).request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
params["since"] = self.get_starting_point(
stream_state=stream_state, repository=stream_slice["repository"], branch=stream_slice["branch"]
)
params["since"] = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
params["sha"] = stream_slice["branch"]
return params
@@ -607,31 +604,16 @@ class Commits(IncrementalGithubStream):
current_stream_state[current_repository][current_branch] = {self.cursor_field: state_value}
return current_stream_state
def get_starting_point(self, stream_state: Mapping[str, Any], repository: str, branch: str) -> str:
start_point = self._start_date
if stream_state and stream_state.get(repository, {}).get(branch, {}).get(self.cursor_field):
return max(start_point, stream_state[repository][branch][self.cursor_field])
def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
repository = stream_slice["repository"]
branch = stream_slice["branch"]
if stream_state:
stream_state_value = stream_state.get(repository, {}).get(branch, {}).get(self.cursor_field)
if stream_state_value:
return max(self._start_date, stream_state_value)
if branch == self.default_branches[repository]:
return super().get_starting_point(stream_state=stream_state, repository=repository)
return start_point
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
start_point = self.get_starting_point(
stream_state=stream_state, repository=stream_slice["repository"], branch=stream_slice["branch"]
)
for record in super(SemiIncrementalGithubStream, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if record[self.cursor_field] > start_point:
yield record
elif self.is_sorted_descending and record[self.cursor_field] < start_point:
break
return super().get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
return self._start_date
class Issues(IncrementalGithubStream):

View File

@@ -4,6 +4,7 @@
from unittest.mock import MagicMock
import pytest
import responses
from airbyte_cdk.models import AirbyteConnectionStatus, Status
from source_github.source import SourceGithub
@@ -52,3 +53,69 @@ def test_check_connection_org_only():
assert status.status == Status.SUCCEEDED
# One request to check organization
assert len(responses.calls) == 1
@responses.activate
def test_get_branches_data():
repository_args = {"repositories": ["airbytehq/integration-test"], "page_size_for_large_streams": 10}
source = SourceGithub()
responses.add(
"GET",
"https://api.github.com/repos/airbytehq/integration-test",
json={"full_name": "airbytehq/integration-test", "default_branch": "master"},
)
responses.add(
"GET",
"https://api.github.com/repos/airbytehq/integration-test/branches",
json=[
{"repository": "airbytehq/integration-test", "name": "feature/branch_0"},
{"repository": "airbytehq/integration-test", "name": "feature/branch_1"},
{"repository": "airbytehq/integration-test", "name": "feature/branch_2"},
{"repository": "airbytehq/integration-test", "name": "master"},
],
)
default_branches, branches_to_pull = source._get_branches_data("", repository_args)
assert default_branches == {"airbytehq/integration-test": "master"}
assert branches_to_pull == {"airbytehq/integration-test": ["master"]}
default_branches, branches_to_pull = source._get_branches_data(
"airbytehq/integration-test/feature/branch_0 airbytehq/integration-test/feature/branch_1 airbytehq/integration-test/feature/branch_3",
repository_args,
)
assert default_branches == {"airbytehq/integration-test": "master"}
assert len(branches_to_pull["airbytehq/integration-test"]) == 2
assert "feature/branch_0" in branches_to_pull["airbytehq/integration-test"]
assert "feature/branch_1" in branches_to_pull["airbytehq/integration-test"]
@responses.activate
def test_generate_repositories():
source = SourceGithub()
with pytest.raises(Exception):
config = {"repository": ""}
source._generate_repositories(config, authenticator=None)
responses.add(
"GET",
"https://api.github.com/orgs/docker/repos",
json=[
{"full_name": "docker/docker-py"},
{"full_name": "docker/compose"},
],
)
config = {"repository": "airbytehq/integration-test docker/*"}
repositories_list, organisation_repos = source._generate_repositories(config, authenticator=None)
assert repositories_list == ["airbytehq/integration-test"]
assert len(organisation_repos) == 2
assert "docker/compose" in organisation_repos
assert "docker/docker-py" in organisation_repos

View File

@@ -9,9 +9,33 @@ import pytest
import requests
import responses
from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException
from source_github.streams import Projects, PullRequestCommentReactions, Repositories, Teams
from responses import matchers
from source_github.streams import (
Branches,
Collaborators,
Comments,
CommitComments,
Commits,
Deployments,
IssueEvents,
IssueLabels,
IssueMilestones,
Organizations,
ProjectCards,
ProjectColumns,
Projects,
PullRequestCommentReactions,
PullRequestCommits,
PullRequests,
Releases,
Repositories,
Stargazers,
Tags,
Teams,
Users,
)
from .utils import read_full_refresh
from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental
DEFAULT_BACKOFF_DELAYS = [5, 10, 20, 40, 80]
@@ -52,10 +76,21 @@ def test_backoff_time(http_status, response_text, expected_backoff_time):
assert stream.backoff_time(response_mock) == expected_backoff_time
@responses.activate
@patch("time.sleep")
def test_retry_after(time_mock):
stream = Organizations(organizations=["airbytehq"])
responses.add("GET", "https://api.github.com/orgs/airbytehq", json={"login": "airbytehq"}, headers={"Retry-After": "10"})
read_full_refresh(stream)
assert time_mock.call_args[0][0] == 10
assert len(responses.calls) == 1
assert responses.calls[0].request.url == "https://api.github.com/orgs/airbytehq?per_page=100"
@responses.activate
def test_stream_teams_404():
kwargs = {"organizations": ["org_name"]}
stream = Teams(**kwargs)
organization_args = {"organizations": ["org_name"]}
stream = Teams(**organization_args)
responses.add(
"GET",
@@ -69,10 +104,46 @@ def test_stream_teams_404():
assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/teams?per_page=100"
@responses.activate
def test_stream_organizations_read():
organization_args = {"organizations": ["org1", "org2"]}
stream = Organizations(**organization_args)
responses.add("GET", "https://api.github.com/orgs/org1", json={"id": 1})
responses.add("GET", "https://api.github.com/orgs/org2", json={"id": 2})
records = read_full_refresh(stream)
assert records == [{"id": 1}, {"id": 2}]
@responses.activate
def test_stream_teams_read():
organization_args = {"organizations": ["org1", "org2"]}
stream = Teams(**organization_args)
responses.add("GET", "https://api.github.com/orgs/org1/teams", json=[{"id": 1}, {"id": 2}])
responses.add("GET", "https://api.github.com/orgs/org2/teams", json=[{"id": 3}])
records = read_full_refresh(stream)
assert records == [{"id": 1, "organization": "org1"}, {"id": 2, "organization": "org1"}, {"id": 3, "organization": "org2"}]
assert len(responses.calls) == 2
assert responses.calls[0].request.url == "https://api.github.com/orgs/org1/teams?per_page=100"
assert responses.calls[1].request.url == "https://api.github.com/orgs/org2/teams?per_page=100"
@responses.activate
def test_stream_users_read():
organization_args = {"organizations": ["org1", "org2"]}
stream = Users(**organization_args)
responses.add("GET", "https://api.github.com/orgs/org1/members", json=[{"id": 1}, {"id": 2}])
responses.add("GET", "https://api.github.com/orgs/org2/members", json=[{"id": 3}])
records = read_full_refresh(stream)
assert records == [{"id": 1, "organization": "org1"}, {"id": 2, "organization": "org1"}, {"id": 3, "organization": "org2"}]
assert len(responses.calls) == 2
assert responses.calls[0].request.url == "https://api.github.com/orgs/org1/members?per_page=100"
assert responses.calls[1].request.url == "https://api.github.com/orgs/org2/members?per_page=100"
@responses.activate
def test_stream_repositories_404():
kwargs = {"organizations": ["org_name"]}
stream = Repositories(**kwargs)
organization_args = {"organizations": ["org_name"]}
stream = Repositories(**organization_args)
responses.add(
"GET",
@@ -87,10 +158,24 @@ def test_stream_repositories_404():
@responses.activate
def test_stream_projects_disabled():
kwargs = {"start_date": "start_date", "page_size_for_large_streams": 30, "repositories": ["test_repo"]}
stream = Projects(**kwargs)
def test_stream_repositories_read():
organization_args = {"organizations": ["org1", "org2"]}
stream = Repositories(**organization_args)
responses.add("GET", "https://api.github.com/orgs/org1/repos", json=[{"id": 1}, {"id": 2}])
responses.add("GET", "https://api.github.com/orgs/org2/repos", json=[{"id": 3}])
records = read_full_refresh(stream)
assert records == [{"id": 1, "organization": "org1"}, {"id": 2, "organization": "org1"}, {"id": 3, "organization": "org2"}]
assert len(responses.calls) == 2
assert responses.calls[0].request.url == "https://api.github.com/orgs/org1/repos?per_page=100"
assert responses.calls[1].request.url == "https://api.github.com/orgs/org2/repos?per_page=100"
@responses.activate
def test_stream_projects_disabled():
repository_args_with_start_date = {"start_date": "start_date", "page_size_for_large_streams": 30, "repositories": ["test_repo"]}
stream = Projects(**repository_args_with_start_date)
responses.add(
"GET",
"https://api.github.com/repos/test_repo/projects",
@@ -101,3 +186,476 @@ def test_stream_projects_disabled():
assert read_full_refresh(stream) == []
assert len(responses.calls) == 1
assert responses.calls[0].request.url == "https://api.github.com/repos/test_repo/projects?per_page=100&state=all"
@responses.activate
def test_stream_pull_requests_incremental_read():
page_size = 2
repository_args_with_start_date = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": page_size,
"start_date": "2022-02-02T10:10:03Z",
}
stream = PullRequests(**repository_args_with_start_date)
data = [
{"id": 1, "updated_at": "2022-02-02T10:10:02Z"},
{"id": 2, "updated_at": "2022-02-02T10:10:04Z"},
{"id": 3, "updated_at": "2022-02-02T10:10:06Z"},
{"id": 4, "updated_at": "2022-02-02T10:10:08Z"},
{"id": 5, "updated_at": "2022-02-02T10:10:10Z"},
{"id": 6, "updated_at": "2022-02-02T10:10:12Z"},
]
api_url = "https://api.github.com/repos/organization/repository/pulls"
responses.add(
"GET",
api_url,
json=data[0:2],
headers={"Link": '<https://api.github.com/repositories/400052213/pulls?page=2>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": str(page_size), "direction": "asc"}, strict_match=False)],
)
responses.add(
"GET",
api_url,
json=data[2:4],
match=[matchers.query_param_matcher({"per_page": str(page_size), "direction": "asc", "page": "2"}, strict_match=False)],
)
responses.add(
"GET",
api_url,
json=data[5:3:-1],
headers={"Link": '<https://api.github.com/repositories/400052213/pulls?page=2>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": str(page_size), "direction": "desc"}, strict_match=False)],
)
responses.add(
"GET",
api_url,
json=data[3:1:-1],
headers={"Link": '<https://api.github.com/repositories/400052213/pulls?page=3>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": str(page_size), "direction": "desc", "page": "2"}, strict_match=False)],
)
stream_state = {}
records = read_incremental(stream, stream_state)
assert [r["id"] for r in records] == [2, 3, 4]
assert stream_state == {"organization/repository": {"updated_at": "2022-02-02T10:10:08Z"}}
records = read_incremental(stream, stream_state)
assert [r["id"] for r in records] == [6, 5]
assert stream_state == {"organization/repository": {"updated_at": "2022-02-02T10:10:12Z"}}
@responses.activate
def test_stream_commits_incremental_read():
repository_args_with_start_date = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
"start_date": "2022-02-02T10:10:03Z",
}
default_branches = {"organization/repository": "master"}
branches_to_pull = {"organization/repository": ["branch"]}
stream = Commits(**repository_args_with_start_date, branches_to_pull=branches_to_pull, default_branches=default_branches)
data = [
{"sha": 1, "commit": {"author": {"date": "2022-02-02T10:10:02Z"}}},
{"sha": 2, "commit": {"author": {"date": "2022-02-02T10:10:04Z"}}},
{"sha": 3, "commit": {"author": {"date": "2022-02-02T10:10:06Z"}}},
{"sha": 4, "commit": {"author": {"date": "2022-02-02T10:10:08Z"}}},
{"sha": 5, "commit": {"author": {"date": "2022-02-02T10:10:10Z"}}},
]
api_url = "https://api.github.com/repos/organization/repository/commits"
responses.add(
"GET",
api_url,
json=data[0:3],
match=[matchers.query_param_matcher({"since": "2022-02-02T10:10:03Z", "sha": "branch"}, strict_match=False)],
)
responses.add(
"GET",
api_url,
json=data[3:5],
match=[matchers.query_param_matcher({"since": "2022-02-02T10:10:06Z", "sha": "branch"}, strict_match=False)],
)
stream_state = {}
records = read_incremental(stream, stream_state)
assert [r["sha"] for r in records] == [2, 3]
assert stream_state == {"organization/repository": {"branch": {"created_at": "2022-02-02T10:10:06Z"}}}
records = read_incremental(stream, stream_state)
assert [r["sha"] for r in records] == [4, 5]
assert stream_state == {"organization/repository": {"branch": {"created_at": "2022-02-02T10:10:10Z"}}}
@responses.activate
def test_stream_commits_state_upgrade():
repository_args_with_start_date = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
"start_date": "2022-02-02T10:10:02Z",
}
default_branches = {"organization/repository": "master"}
branches_to_pull = {"organization/repository": ["master"]}
stream = Commits(**repository_args_with_start_date, branches_to_pull=branches_to_pull, default_branches=default_branches)
responses.add(
"GET",
"https://api.github.com/repos/organization/repository/commits",
json=[
{"sha": 1, "commit": {"author": {"date": "2022-02-02T10:10:02Z"}}},
{"sha": 2, "commit": {"author": {"date": "2022-02-02T10:10:04Z"}}},
],
match=[matchers.query_param_matcher({"since": "2022-02-02T10:10:02Z", "sha": "master"}, strict_match=False)],
)
stream_state = {"organization/repository": {"created_at": "2022-02-02T10:10:02Z"}}
records = read_incremental(stream, stream_state)
assert [r["sha"] for r in records] == [2]
assert stream_state == {"organization/repository": {"master": {"created_at": "2022-02-02T10:10:04Z"}}}
@responses.activate
def test_stream_pull_request_commits():
repository_args = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
}
repository_args_with_start_date = {**repository_args, "start_date": "2022-02-02T10:10:02Z"}
stream = PullRequestCommits(PullRequests(**repository_args_with_start_date), **repository_args)
responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls",
json=[
{"id": 1, "updated_at": "2022-02-02T10:10:02Z", "number": 1},
{"id": 2, "updated_at": "2022-02-02T10:10:04Z", "number": 2},
{"id": 3, "updated_at": "2022-02-02T10:10:06Z", "number": 3},
],
)
responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls/2/commits",
json=[{"sha": 1}, {"sha": 2}],
)
responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls/3/commits",
json=[{"sha": 3}, {"sha": 4}],
)
records = read_full_refresh(stream)
assert records == [
{"sha": 1, "repository": "organization/repository", "pull_number": 2},
{"sha": 2, "repository": "organization/repository", "pull_number": 2},
{"sha": 3, "repository": "organization/repository", "pull_number": 3},
{"sha": 4, "repository": "organization/repository", "pull_number": 3},
]
@responses.activate
def test_stream_project_columns():
repository_args_with_start_date = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
"start_date": "2022-02-01T00:00:00Z",
}
data = [
{
"updated_at": "2022-01-01T10:00:00Z",
},
{
"updated_at": "2022-03-01T10:00:00Z",
"columns": [
{"updated_at": "2022-01-01T10:00:00Z"},
{"updated_at": "2022-03-01T09:00:00Z"},
{"updated_at": "2022-03-01T10:00:00Z"},
],
},
{
"updated_at": "2022-05-01T10:00:00Z",
"columns": [
{"updated_at": "2022-01-01T10:00:00Z"},
{"updated_at": "2022-05-01T10:00:00Z"},
],
},
]
ProjectsResponsesAPI.register(data)
stream = ProjectColumns(Projects(**repository_args_with_start_date), **repository_args_with_start_date)
stream_state = {}
records = read_incremental(stream, stream_state=stream_state)
assert records == [
{"id": 22, "name": "column_22", "project_id": 2, "repository": "organization/repository", "updated_at": "2022-03-01T09:00:00Z"},
{"id": 23, "name": "column_23", "project_id": 2, "repository": "organization/repository", "updated_at": "2022-03-01T10:00:00Z"},
{"id": 32, "name": "column_32", "project_id": 3, "repository": "organization/repository", "updated_at": "2022-05-01T10:00:00Z"},
]
assert stream_state == {
"organization/repository": {"2": {"updated_at": "2022-03-01T10:00:00Z"}, "3": {"updated_at": "2022-05-01T10:00:00Z"}}
}
data = [
{"updated_at": "2022-01-01T10:00:00Z"},
{
"updated_at": "2022-04-01T10:00:00Z",
"columns": [
{"updated_at": "2022-01-01T10:00:00Z"},
{"updated_at": "2022-03-01T09:00:00Z"},
{"updated_at": "2022-03-01T10:00:00Z"},
{"updated_at": "2022-04-01T10:00:00Z"},
],
},
{
"updated_at": "2022-05-01T10:00:00Z",
"columns": [
{"updated_at": "2022-01-01T10:00:00Z"},
{"updated_at": "2022-05-01T10:00:00Z"},
],
},
{
"updated_at": "2022-06-01T10:00:00Z",
"columns": [{"updated_at": "2022-06-01T10:00:00Z"}],
},
]
ProjectsResponsesAPI.register(data)
records = read_incremental(stream, stream_state=stream_state)
assert records == [
{"id": 24, "name": "column_24", "project_id": 2, "repository": "organization/repository", "updated_at": "2022-04-01T10:00:00Z"},
{"id": 41, "name": "column_41", "project_id": 4, "repository": "organization/repository", "updated_at": "2022-06-01T10:00:00Z"},
]
assert stream_state == {
"organization/repository": {
"2": {"updated_at": "2022-04-01T10:00:00Z"},
"3": {"updated_at": "2022-05-01T10:00:00Z"},
"4": {"updated_at": "2022-06-01T10:00:00Z"},
}
}
@responses.activate
def test_stream_project_cards():
repository_args_with_start_date = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
"start_date": "2022-03-01T00:00:00Z",
}
projects_stream = Projects(**repository_args_with_start_date)
project_columns_stream = ProjectColumns(projects_stream, **repository_args_with_start_date)
stream = ProjectCards(project_columns_stream, **repository_args_with_start_date)
data = [
{
"updated_at": "2022-01-01T00:00:00Z",
},
{
"updated_at": "2022-06-01T00:00:00Z",
"columns": [
{
"updated_at": "2022-04-01T00:00:00Z",
"cards": [
{"updated_at": "2022-03-01T00:00:00Z"},
{"updated_at": "2022-04-01T00:00:00Z"},
],
},
{"updated_at": "2022-05-01T09:00:00Z"},
{
"updated_at": "2022-06-01T00:00:00Z",
"cards": [
{"updated_at": "2022-05-01T00:00:00Z"},
{"updated_at": "2022-06-01T00:00:00Z"},
],
},
],
},
{
"updated_at": "2022-05-01T00:00:00Z",
"columns": [
{"updated_at": "2022-01-01T00:00:00Z"},
{
"updated_at": "2022-05-01T00:00:00Z",
"cards": [
{"updated_at": "2022-02-01T00:00:00Z"},
{"updated_at": "2022-05-01T00:00:00Z"},
],
},
],
},
]
ProjectsResponsesAPI.register(data)
stream_state = {}
records = read_incremental(stream, stream_state=stream_state)
assert records == [
{
"column_id": 21,
"id": 212,
"name": "card_212",
"project_id": 2,
"repository": "organization/repository",
"updated_at": "2022-04-01T00:00:00Z",
},
{
"column_id": 23,
"id": 231,
"name": "card_231",
"project_id": 2,
"repository": "organization/repository",
"updated_at": "2022-05-01T00:00:00Z",
},
{
"column_id": 23,
"id": 232,
"name": "card_232",
"project_id": 2,
"repository": "organization/repository",
"updated_at": "2022-06-01T00:00:00Z",
},
{
"column_id": 32,
"id": 322,
"name": "card_322",
"project_id": 3,
"repository": "organization/repository",
"updated_at": "2022-05-01T00:00:00Z",
},
]
@responses.activate
def test_stream_comments():
repository_args_with_start_date = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
"start_date": "2022-02-02T10:10:03Z",
}
stream = Comments(**repository_args_with_start_date)
data = [
{"id": 1, "updated_at": "2022-02-02T10:10:02Z"},
{"id": 2, "updated_at": "2022-02-02T10:10:04Z"},
{"id": 3, "updated_at": "2022-02-02T10:10:06Z"},
{"id": 4, "updated_at": "2022-02-02T10:10:08Z"},
]
api_url = "https://api.github.com/repos/organization/repository/issues/comments"
responses.add(
"GET",
api_url,
json=data[0:2],
match=[matchers.query_param_matcher({"since": "2022-02-02T10:10:03Z"}, strict_match=False)],
)
responses.add(
"GET",
api_url,
json=data[2:4],
match=[matchers.query_param_matcher({"since": "2022-02-02T10:10:04Z"}, strict_match=False)],
)
stream_state = {}
records = read_incremental(stream, stream_state)
assert records == [{"id": 2, "repository": "organization/repository", "updated_at": "2022-02-02T10:10:04Z"}]
assert stream_state == {"organization/repository": {"updated_at": "2022-02-02T10:10:04Z"}}
records = read_incremental(stream, stream_state)
assert records == [
{"id": 3, "repository": "organization/repository", "updated_at": "2022-02-02T10:10:06Z"},
{"id": 4, "repository": "organization/repository", "updated_at": "2022-02-02T10:10:08Z"},
]
assert stream_state == {"organization/repository": {"updated_at": "2022-02-02T10:10:08Z"}}
@responses.activate
def test_streams_read_full_refresh():
repository_args = {
"repositories": ["organization/repository"],
"page_size_for_large_streams": 100,
}
repository_args_with_start_date = {**repository_args, "start_date": "2022-02-01T00:00:00Z"}
def get_json_response(cursor_field):
cursor_field = cursor_field or "updated_at"
return [
{"id": 1, cursor_field: "2022-02-01T00:00:00Z"},
{"id": 2, cursor_field: "2022-02-02T00:00:00Z"},
]
def get_records(cursor_field):
cursor_field = cursor_field or "updated_at"
return [
{"id": 1, cursor_field: "2022-02-01T00:00:00Z", "repository": "organization/repository"},
{"id": 2, cursor_field: "2022-02-02T00:00:00Z", "repository": "organization/repository"},
]
for cls, url in [
(Releases, "https://api.github.com/repos/organization/repository/releases"),
(IssueEvents, "https://api.github.com/repos/organization/repository/issues/events"),
(IssueMilestones, "https://api.github.com/repos/organization/repository/milestones"),
(CommitComments, "https://api.github.com/repos/organization/repository/comments"),
(Deployments, "https://api.github.com/repos/organization/repository/deployments"),
]:
stream = cls(**repository_args_with_start_date)
responses.add("GET", url, json=get_json_response(stream.cursor_field))
records = read_full_refresh(stream)
assert records == get_records(stream.cursor_field)[1:2]
for cls, url in [
(Tags, "https://api.github.com/repos/organization/repository/tags"),
(IssueLabels, "https://api.github.com/repos/organization/repository/labels"),
(Collaborators, "https://api.github.com/repos/organization/repository/collaborators"),
(Branches, "https://api.github.com/repos/organization/repository/branches"),
]:
stream = cls(**repository_args)
responses.add("GET", url, json=get_json_response(stream.cursor_field))
records = read_full_refresh(stream)
assert records == get_records(stream.cursor_field)
responses.add(
"GET",
"https://api.github.com/repos/organization/repository/stargazers",
json=[
{"starred_at": "2022-02-01T00:00:00Z", "user": {"id": 1}},
{"starred_at": "2022-02-02T00:00:00Z", "user": {"id": 2}},
],
)
stream = Stargazers(**repository_args_with_start_date)
records = read_full_refresh(stream)
assert records == [{"repository": "organization/repository", "starred_at": "2022-02-02T00:00:00Z", "user": {"id": 2}, "user_id": 2}]

View File

@@ -2,7 +2,9 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from typing import Any, MutableMapping
import responses
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
@@ -13,3 +15,59 @@ def read_full_refresh(stream_instance: Stream):
for slice in slices:
records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh)))
return records
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
res = []
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
for slice in slices:
records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
res.append(record)
return res
class ProjectsResponsesAPI:
"""
Fake Responses API for github projects, columns, cards
"""
projects_url = "https://api.github.com/repos/organization/repository/projects"
columns_url = "https://api.github.com/projects/{project_id}/columns"
cards_url = "https://api.github.com/projects/columns/{column_id}/cards"
@classmethod
def get_json_projects(cls, data):
res = []
for n, project in enumerate(data, start=1):
name = f"project_{n}"
res.append({"id": n, "name": name, "updated_at": project["updated_at"]})
return res
@classmethod
def get_json_columns(cls, project, project_id):
res = []
for n, column in enumerate(project.get("columns", []), start=1):
column_id = int(str(project_id) + str(n))
name = f"column_{column_id}"
res.append({"id": column_id, "name": name, "updated_at": column["updated_at"]})
return res
@classmethod
def get_json_cards(cls, column, column_id):
res = []
for n, card in enumerate(column.get("cards", []), start=1):
card_id = int(str(column_id) + str(n))
name = f"card_{card_id}"
res.append({"id": card_id, "name": name, "updated_at": card["updated_at"]})
return res
@classmethod
def register(cls, data):
responses.upsert("GET", cls.projects_url, json=cls.get_json_projects(data))
for project_id, project in enumerate(data, start=1):
responses.upsert("GET", cls.columns_url.format(project_id=project_id), json=cls.get_json_columns(project, project_id))
for n, column in enumerate(project.get("columns", []), start=1):
column_id = int(str(project_id) + str(n))
responses.upsert("GET", cls.cards_url.format(column_id=column_id), json=cls.get_json_cards(column, column_id))