* CDK low-code: Add token_expiry_date_format to OAuth Authenticator * CDK low-code: Resolve ref schema * CDK low-code: Resolve ref schema * CDK low-code: Resolve ref schema * CDK low-code: Add test for token_expiry_date_format * CDK low-code: set initial time before refresh request * CDK low-code: Add test schema loader * CDK low-code: Add test dependencies * CDK low-code: Refactor JsonFileSchemaLoader (inherit from original CDK) * CDK low-code: Fix SingleUseRefreshTokenOauth2Authenticator (add token_expiry_date_format) * CDK low-code: Fix tests * CDK low-code: Refactor import * CDK low-code: Refactor JsonFileSchemaLoader * CDK low-code: format
138 lines
5.1 KiB
Python
138 lines
5.1 KiB
Python
#
|
|
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import logging
|
|
|
|
import freezegun
|
|
import pendulum
|
|
import pytest
|
|
import requests
|
|
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator
|
|
from requests import Response
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
resp = Response()
|
|
|
|
config = {
|
|
"refresh_endpoint": "refresh_end",
|
|
"client_id": "some_client_id",
|
|
"client_secret": "some_client_secret",
|
|
"token_expiry_date": pendulum.now().subtract(days=2).to_rfc3339_string(),
|
|
"custom_field": "in_outbound_request",
|
|
"another_field": "exists_in_body",
|
|
"grant_type": "some_grant_type",
|
|
}
|
|
options = {"refresh_token": "some_refresh_token"}
|
|
|
|
|
|
class TestOauth2Authenticator:
|
|
"""
|
|
Test class for OAuth2Authenticator.
|
|
"""
|
|
|
|
def test_refresh_request_body(self):
|
|
"""
|
|
Request body should match given configuration.
|
|
"""
|
|
scopes = ["scope1", "scope2"]
|
|
oauth = DeclarativeOauth2Authenticator(
|
|
token_refresh_endpoint="{{ config['refresh_endpoint'] }}",
|
|
client_id="{{ config['client_id'] }}",
|
|
client_secret="{{ config['client_secret'] }}",
|
|
refresh_token="{{ options['refresh_token'] }}",
|
|
config=config,
|
|
scopes=["scope1", "scope2"],
|
|
token_expiry_date="{{ config['token_expiry_date'] }}",
|
|
refresh_request_body={
|
|
"custom_field": "{{ config['custom_field'] }}",
|
|
"another_field": "{{ config['another_field'] }}",
|
|
"scopes": ["no_override"],
|
|
},
|
|
options=options,
|
|
grant_type="{{ config['grant_type'] }}"
|
|
)
|
|
body = oauth.build_refresh_request_body()
|
|
expected = {
|
|
"grant_type": "some_grant_type",
|
|
"client_id": "some_client_id",
|
|
"client_secret": "some_client_secret",
|
|
"refresh_token": "some_refresh_token",
|
|
"scopes": scopes,
|
|
"custom_field": "in_outbound_request",
|
|
"another_field": "exists_in_body",
|
|
}
|
|
assert body == expected
|
|
|
|
def test_refresh_access_token(self, mocker):
|
|
oauth = DeclarativeOauth2Authenticator(
|
|
token_refresh_endpoint="{{ config['refresh_endpoint'] }}",
|
|
client_id="{{ config['client_id'] }}",
|
|
client_secret="{{ config['client_secret'] }}",
|
|
refresh_token="{{ config['refresh_token'] }}",
|
|
config=config,
|
|
scopes=["scope1", "scope2"],
|
|
token_expiry_date="{{ config['token_expiry_date'] }}",
|
|
refresh_request_body={
|
|
"custom_field": "{{ config['custom_field'] }}",
|
|
"another_field": "{{ config['another_field'] }}",
|
|
"scopes": ["no_override"],
|
|
},
|
|
options={},
|
|
)
|
|
|
|
resp.status_code = 200
|
|
mocker.patch.object(resp, "json", return_value={"access_token": "access_token", "expires_in": 1000})
|
|
mocker.patch.object(requests, "request", side_effect=mock_request, autospec=True)
|
|
token = oauth.refresh_access_token()
|
|
|
|
schem = DeclarativeOauth2Authenticator.json_schema()
|
|
print(schem)
|
|
|
|
assert ("access_token", 1000) == token
|
|
|
|
@pytest.mark.parametrize(
|
|
"expires_in_response, token_expiry_date_format",
|
|
[
|
|
(86400, None),
|
|
("2020-01-02T00:00:00Z", "YYYY-MM-DDTHH:mm:ss[Z]"),
|
|
('2020-01-02T00:00:00.000000+00:00', "YYYY-MM-DDTHH:mm:ss.SSSSSSZ"),
|
|
("2020-01-02", "YYYY-MM-DD"),
|
|
],
|
|
ids=["time_in_seconds", "rfc3339", "iso8601", "simple_date"]
|
|
)
|
|
@freezegun.freeze_time("2020-01-01")
|
|
def test_refresh_access_token_expire_format(self, mocker, expires_in_response, token_expiry_date_format):
|
|
next_day = "2020-01-02T00:00:00Z"
|
|
config.update({"token_expiry_date": pendulum.parse(next_day).subtract(days=2).to_rfc3339_string()})
|
|
oauth = DeclarativeOauth2Authenticator(
|
|
token_refresh_endpoint="{{ config['refresh_endpoint'] }}",
|
|
client_id="{{ config['client_id'] }}",
|
|
client_secret="{{ config['client_secret'] }}",
|
|
refresh_token="{{ config['refresh_token'] }}",
|
|
config=config,
|
|
scopes=["scope1", "scope2"],
|
|
token_expiry_date="{{ config['token_expiry_date'] }}",
|
|
token_expiry_date_format=token_expiry_date_format,
|
|
refresh_request_body={
|
|
"custom_field": "{{ config['custom_field'] }}",
|
|
"another_field": "{{ config['another_field'] }}",
|
|
"scopes": ["no_override"],
|
|
},
|
|
options={},
|
|
)
|
|
|
|
resp.status_code = 200
|
|
mocker.patch.object(resp, "json", return_value={"access_token": "access_token", "expires_in": expires_in_response})
|
|
mocker.patch.object(requests, "request", side_effect=mock_request, autospec=True)
|
|
token = oauth.get_access_token()
|
|
assert "access_token" == token
|
|
assert oauth.get_token_expiry_date() == pendulum.parse(next_day)
|
|
|
|
|
|
def mock_request(method, url, data):
|
|
if url == "refresh_end":
|
|
return resp
|
|
raise Exception(f"Error while refreshing access token with request: {method}, {url}, {data}")
|