1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source oauth0: new streams and fix incremental (#29001)

* Add new streams Organizations,OrganizationMembers,OrganizationMemberRoles

* relax schema definition to allow additional fields

* Bump image tag version

* revert some changes to the old schemas

* Format python so gradle can pass

* update incremental

* remove unused print

* fix unit test

---------

Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com>
This commit is contained in:
Marcos Marx
2023-08-03 11:05:19 -03:00
committed by GitHub
parent 3bc79be30a
commit 03ffad5d4f
16 changed files with 415 additions and 52 deletions

View File

@@ -34,5 +34,5 @@ COPY source_auth0 ./source_auth0
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-auth0

View File

@@ -1,22 +1,30 @@
connector_image: airbyte/source-auth0:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_auth0/spec.yaml"
tests:
- spec_path: "source_auth0/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
fail_on_extra_columns: false
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state:
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -1,3 +1,9 @@
{
"users": { "updated_at": "3021-09-08T07:04:28.000Z" }
}
[
{
"type": "STREAM",
"stream": {
"stream_state": { "updated_at": "5000-08-02T16:18:47.824Z" },
"stream_descriptor": { "name": "users" }
}
}
]

View File

@@ -20,6 +20,36 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["client_id"]]
},
{
"stream": {
"name": "organizations",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["id"]]
},
{
"stream": {
"name": "organization_members",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["id"]]
},
{
"stream": {
"name": "organization_member_roles",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["id"]]
}
]
}

View File

@@ -1,3 +1,9 @@
{
"users": { "updated_at": "2021-09-08T07:04:28.000Z" }
}
[
{
"type": "STREAM",
"stream": {
"stream_state": { "updated_at": "2000-08-02T16:18:47.824Z" },
"stream_descriptor": { "name": "users" }
}
}
]

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6c504e48-14aa-4221-9a72-19cf5ff1ae78
dockerImageTag: 0.2.0
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-auth0
githubIssueLabel: source-auth0
icon: auth0.svg

View File

@@ -121,7 +121,7 @@
}
},
"signing_keys": {
"type": ["array", "null"],
"type": ["null", "array"],
"items": {
"type": ["object", "null"],
"additionalProperties": true

View File

@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["object", "null"],
"additionalProperties": true,
"properties": {
"id": {
"type": ["string", "null"]
},
"org_id": {
"type": ["string", "null"]
},
"user_id": {
"type": ["string", "null"]
},
"name": {
"type": ["string", "null"]
},
"description": {
"type": ["string", "null"]
}
}
}

View File

@@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["object", "null"],
"additionalProperties": true,
"properties":{
"id": {
"type": ["string", "null"]
},
"org_id": {
"type": ["string", "null"]
},
"user_id": {
"type": ["string", "null"]
},
"name": {
"type": ["string", "null"]
},
"email": {
"type": ["string", "null"]
},
"picture": {
"type": ["string", "null"]
}
}
}

View File

@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["object", "null"],
"additionalProperties": true,
"properties": {
"id": {
"type": ["string", "null"]
},
"name": {
"type": ["string", "null"]
},
"display_name": {
"type": ["string", "null"]
},
"branding": {
"type": ["object", "null"],
"additionalProperties": true
},
"metadata": {
"type": ["object", "null"],
"additionalProperties": true
}
}
}

View File

@@ -32,10 +32,15 @@
"type": ["string", "null"]
},
"identities": {
"type": "array",
"type": ["null", "array"],
"items": {
"type": ["object", "null"],
"additionalProperties": true
"additionalProperties": true,
"properties": {
"connection": {
"type": ["string", "null"]
}
}
}
},
"app_metadata": {
@@ -56,8 +61,11 @@
"type": ["string", "null"]
},
"multifactor": {
"type": ["object", "null"],
"additionalProperties": true
"type": ["null", "array"],
"additionalProperties": true,
"items": {
"type": ["string", "null"]
}
},
"last_ip": {
"type": ["string", "null"]

View File

@@ -4,18 +4,27 @@
import logging
from abc import ABC, abstractmethod
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib import parse
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
from source_auth0.utils import get_api_endpoint, initialize_authenticator
def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record
# Basic full refresh stream
class Auth0Stream(HttpStream, ABC):
api_version = "v2"
@@ -84,38 +93,40 @@ class Auth0Stream(HttpStream, ABC):
class IncrementalAuth0Stream(Auth0Stream, IncrementalMixin):
min_id = ""
cursor_field = "updated_at"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cursor_value = self.min_id
@property
@abstractmethod
def cursor_field(self) -> str:
pass
self._cursor_value = None
@property
def state(self) -> MutableMapping[str, Any]:
return {self.cursor_field: self._cursor_value}
if self._cursor_value:
return {self.cursor_field: self._cursor_value}
else:
return {self.cursor_field: self.min_id}
@state.setter
def state(self, value: MutableMapping[str, Any]):
self._cursor_value = value.get(self.cursor_field)
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
new_state_value = max(latest_record.get(self.cursor_field), current_stream_state.get(self.cursor_field, self.min_id))
self._cursor_value = new_state_value
return {self.cursor_field: new_state_value}
def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=self.state, next_page_token=next_page_token, **kwargs)
latest_entry = self.state.get(self.cursor_field)
filter_param = {"include_totals": "false", "sort": f"{self.cursor_field}:1", "q": f"{self.cursor_field}:{{{latest_entry} TO *]"}
filter_param = {"include_totals": "false", "sort": f"{self.cursor_field}:1"}
if self.state:
filter_param["q"] = self.cursor_field + ":{" + self.state.get(self.cursor_field) + " TO *]"
params.update(filter_param)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
entities = response.json()
if entities:
last_item = entities[-1]
self.state = last_item
yield from entities
@@ -123,6 +134,7 @@ class Clients(Auth0Stream):
primary_key = "client_id"
resource_name = "clients"
class Users(IncrementalAuth0Stream):
min_id = "1900-01-01T00:00:00.000Z"
primary_key = "user_id"
@@ -130,6 +142,61 @@ class Users(IncrementalAuth0Stream):
cursor_field = "updated_at"
class Organizations(Auth0Stream):
primary_key = "id"
resource_name = "organizations"
class OrganizationMembers(Auth0Stream):
primary_key = "id"
resource_name = "members"
def __init__(self, url_base: str, *args, **kwargs):
super().__init__(url_base=url_base, *args, **kwargs)
self.organizations = Organizations(url_base=url_base, *args, **kwargs)
def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
for org in read_full_refresh(self.organizations):
for member in super().read_records(stream_slice={"organization_id": org["id"]}, **kwargs):
yield member
def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
return f"organizations/{stream_slice['organization_id']}/members"
def parse_response(self, response: requests.Response, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping]:
record = response.json().get(self.resource_name)
for r in record:
r["org_id"] = stream_slice["organization_id"]
r["id"] = stream_slice["organization_id"] + "_" + r["user_id"]
yield r
class OrganizationMemberRoles(Auth0Stream):
primary_key = "id"
resource_name = "roles"
def __init__(self, url_base: str, *args, **kwargs):
super().__init__(url_base=url_base, *args, **kwargs)
self.organization_members = OrganizationMembers(url_base=url_base, *args, **kwargs)
def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
return f"organizations/{stream_slice['organization_id']}/members/{stream_slice['user_id']}/roles"
def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
for org_member in read_full_refresh(self.organization_members):
for role in super().read_records(
stream_slice={"organization_id": org_member["org_id"], "user_id": org_member["user_id"]}, **kwargs
):
yield role
def parse_response(self, response: requests.Response, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping]:
record = response.json().get(self.resource_name)
for r in record:
r["org_id"] = stream_slice["organization_id"]
r["user_id"] = stream_slice["user_id"]
yield r
# Source
class SourceAuth0(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
@@ -152,4 +219,10 @@ class SourceAuth0(AbstractSource):
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
initialization_params = {"authenticator": initialize_authenticator(config), "url_base": config.get("base_url")}
return [Clients(**initialization_params), Users(**initialization_params)]
return [
Clients(**initialization_params),
Organizations(**initialization_params),
OrganizationMembers(**initialization_params),
OrganizationMemberRoles(**initialization_params),
Users(**initialization_params),
]

View File

@@ -240,7 +240,50 @@ def clients_instance():
},
"organization_usage": "deny",
"organization_require_behavior": "no_prompt",
"client_authentication_methods": {"private_key_jwt": {"credentials": ["object"]}}
"client_authentication_methods": {"private_key_jwt": {"credentials": ["object"]}},
}
@pytest.fixture()
def organization_instance():
"""
Clients instance object response
"""
return {
"id": "my_org_id",
"name": "My application",
"display_name": "My display_name",
"branding": "brand",
"metadata": "metadata_example",
}
@pytest.fixture()
def organization_member_instance():
"""
Clients instance object response
"""
return {
"id": "my_org_id_my_user_id",
"org_id": "my_org_id",
"user_id": "my_user_id",
"name": "my_name",
"email": "my_email",
"picture": "my_picture",
}
@pytest.fixture()
def organization_member_roles_instance():
"""
Clients instance object response
"""
return {
"id": "something",
"org_id": "my_org_id",
"user_id": "my_user_id",
"name": "my_name",
"description": "desc",
}

View File

@@ -6,7 +6,15 @@ from unittest.mock import MagicMock
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator
from source_auth0.authenticator import Auth0Oauth2Authenticator
from source_auth0.source import SourceAuth0, Clients, Users, initialize_authenticator
from source_auth0.source import (
Clients,
OrganizationMemberRoles,
OrganizationMembers,
Organizations,
SourceAuth0,
Users,
initialize_authenticator,
)
class TestAuthentication:
@@ -71,14 +79,23 @@ class TestAuthentication:
source_auth0 = SourceAuth0()
requests_mock.get(f"{api_url}/api/v2/users?per_page=1", json={"connect": "ok"})
requests_mock.get(f"{api_url}/api/v2/clients?per_page=1", json={"connect": "ok"})
requests_mock.get(f"{api_url}/api/v2/organizations?per_page=1", json={"connect": "ok"})
requests_mock.get(f"{api_url}/api/v2/organizations/test_org_id/members?per_page=1", json={"connect": "ok"})
requests_mock.get(f"{api_url}/api/v2/organizations/test_org_id/members/test_user_id/roles?per_page=1", json={"connect": "ok"})
requests_mock.post(f"{api_url}/oauth/token", json={"access_token": "test_token", "expires_in": 948})
streams = source_auth0.streams(config=oauth_config)
streams_supported = [Clients, Users]
streams_supported = [
Clients,
Organizations,
OrganizationMembers,
OrganizationMemberRoles,
Users,
]
# check the number of streams supported
assert len(streams) == len(streams_supported)
# and each stream to be specific stream
assert isinstance(streams[0], streams_supported[0])
assert isinstance(streams[1], streams_supported[1])
for s in range(len(streams)):
assert isinstance(streams[s], streams_supported[s])

View File

@@ -9,7 +9,15 @@ from unittest.mock import MagicMock
import pytest
import requests
from airbyte_cdk.models import SyncMode
from source_auth0.source import Auth0Stream, IncrementalAuth0Stream, Users, Clients
from source_auth0.source import (
Auth0Stream,
Clients,
IncrementalAuth0Stream,
OrganizationMemberRoles,
OrganizationMembers,
Organizations,
Users,
)
@pytest.fixture
@@ -55,8 +63,8 @@ class TestAuth0Stream:
"page": 0,
"per_page": 50,
"include_totals": "false",
"sort": "None:1",
"q": "None:{ TO *]",
"sort": "updated_at:1",
"q": "updated_at:{ TO *]",
}
assert stream.request_params(**inputs) == expected_params
@@ -97,8 +105,6 @@ class TestAuth0Stream:
cursor_field = "lastUpdated"
stream = TestIncrementalAuth0Stream(url_base=url_base)
stream._cursor_field = "lastUpdated"
assert stream._cursor_value == ""
stream.state = {"lastUpdated": "123"}
assert stream._cursor_value == "123"
@@ -258,3 +264,93 @@ class TestStreamClients:
json={"total": 1, "start": 0, "limit": 50, "clients": [clients_instance]},
)
assert list(stream.parse_response(response=requests.get(f"{api_url}/clients"))) == [clients_instance]
class TestStreamOrganizations:
def test_stream_organizations(self, patch_base_class, organization_instance, url_base, api_url, requests_mock):
stream = Organizations(url_base=url_base)
requests_mock.get(
f"{api_url}/organizations",
json={"total": 1, "start": 0, "limit": 50, "organizations": [organization_instance]},
)
inputs = {"sync_mode": SyncMode.full_refresh}
assert list(stream.read_records(**inputs)) == [organization_instance]
def test_organizations_source_parse_response(self, requests_mock, patch_base_class, organization_instance, url_base, api_url):
stream = Organizations(url_base=url_base)
requests_mock.get(
f"{api_url}/organizations",
json={"total": 1, "start": 0, "limit": 50, "organizations": [organization_instance]},
)
assert list(stream.parse_response(response=requests.get(f"{api_url}/organizations"))) == [organization_instance]
class TestStreamOrganizationsMembers:
def test_stream_organizations(
self, patch_base_class, organization_instance, organization_member_instance, url_base, api_url, requests_mock
):
stream = OrganizationMembers(url_base=url_base)
requests_mock.get(
f"{api_url}/organizations",
json={"total": 1, "start": 0, "limit": 50, "organizations": [organization_instance]},
)
requests_mock.get(
f"{api_url}/organizations/my_org_id/members",
json={"total": 1, "start": 0, "limit": 50, "members": [organization_member_instance]},
)
inputs = {"sync_mode": SyncMode.full_refresh}
assert list(stream.read_records(**inputs)) == [organization_member_instance]
def test_organizations_source_parse_response(self, requests_mock, patch_base_class, organization_member_instance, url_base, api_url):
stream = OrganizationMembers(url_base=url_base)
requests_mock.get(
f"{api_url}/organizations/my_org_id/members",
json={"total": 1, "start": 0, "limit": 50, "members": [organization_member_instance]},
)
stream_slice = {"organization_id": "my_org_id"}
assert list(
stream.parse_response(response=requests.get(f"{api_url}/organizations/my_org_id/members"), stream_slice=stream_slice)
) == [organization_member_instance]
class TestStreamOrganizationsMemberRoles:
def test_stream_organizations(
self,
patch_base_class,
organization_instance,
organization_member_instance,
organization_member_roles_instance,
url_base,
api_url,
requests_mock,
):
stream = OrganizationMemberRoles(url_base=url_base)
requests_mock.get(
f"{api_url}/organizations",
json={"total": 1, "start": 0, "limit": 50, "organizations": [organization_instance]},
)
requests_mock.get(
f"{api_url}/organizations/my_org_id/members",
json={"total": 1, "start": 0, "limit": 50, "members": [organization_member_instance]},
)
requests_mock.get(
f"{api_url}/organizations/my_org_id/members/my_user_id/roles",
json={"total": 1, "start": 0, "limit": 50, "roles": [organization_member_roles_instance]},
)
inputs = {"sync_mode": SyncMode.full_refresh}
assert list(stream.read_records(**inputs)) == [organization_member_roles_instance]
def test_organizations_source_parse_response(
self, requests_mock, patch_base_class, organization_member_roles_instance, url_base, api_url
):
stream = OrganizationMemberRoles(url_base=url_base)
requests_mock.get(
f"{api_url}/organizations/my_org_id/members/my_user_id/roles",
json={"total": 1, "start": 0, "limit": 50, "roles": [organization_member_roles_instance]},
)
stream_slice = {"organization_id": "my_org_id", "user_id": "my_user_id"}
assert list(
stream.parse_response(
response=requests.get(f"{api_url}/organizations/my_org_id/members/my_user_id/roles"), stream_slice=stream_slice
)
) == [organization_member_roles_instance]