Files
dify/api/tests/unit_tests/services/enterprise/test_rbac_service.py
2026-05-11 18:50:00 +08:00

531 lines
21 KiB
Python

"""Unit tests for services.enterprise.rbac_service.
The enterprise RBAC client is almost pure glue: each method turns a single
``EnterpriseRequest.send_inner_rbac_request`` call into a pydantic response
model. Rather than spinning up an HTTP server we monkeypatch that helper and
assert on the arguments it received; that catches both routing regressions
(wrong method / wrong path / wrong params) and model-shape regressions in
one place.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from services.enterprise import rbac_service as svc
MODULE = "services.enterprise.rbac_service"
@pytest.fixture
def mock_send():
with patch(f"{MODULE}.EnterpriseRequest.send_inner_rbac_request") as send:
yield send
def _call_args(send: MagicMock) -> SimpleNamespace:
"""Return the most recent (method, endpoint, kwargs) sent to the mock."""
send.assert_called_once()
args, kwargs = send.call_args
return SimpleNamespace(method=args[0], endpoint=args[1], **kwargs)
class TestCatalog:
def test_workspace_catalog(self, mock_send: MagicMock):
mock_send.return_value = {"groups": [{"group_key": "workspace", "group_name": "工作空间", "permissions": []}]}
out = svc.RBACService.Catalog.workspace("tenant-1", account_id="acct-1")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/role-permissions/catalog"
assert call.tenant_id == "tenant-1"
assert call.account_id == "acct-1"
assert call.json is None
assert call.params is None
assert len(out.groups) == 1
assert out.groups[0].group_key == "workspace"
def test_app_catalog_endpoint(self, mock_send: MagicMock):
mock_send.return_value = {"groups": []}
svc.RBACService.Catalog.app("tenant-1")
assert mock_send.call_args.args[1] == "/rbac/role-permissions/catalog/app"
def test_dataset_catalog_endpoint(self, mock_send: MagicMock):
mock_send.return_value = {"groups": []}
svc.RBACService.Catalog.dataset("tenant-1")
assert mock_send.call_args.args[1] == "/rbac/role-permissions/catalog/dataset"
class TestRoles:
def test_list_forwards_pagination_options(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{
"id": "role-1",
"tenant_id": "tenant-1",
"type": "workspace",
"category": "global_custom",
"name": "Owner",
"permission_keys": ["workspace.member.manage"],
}
],
"pagination": {"total_count": 1, "per_page": 20, "current_page": 1, "total_pages": 1},
}
out = svc.RBACService.Roles.list(
"tenant-1",
"acct-1",
options=svc.ListOption(page_number=2, results_per_page=50, reverse=True),
)
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/roles"
assert call.params == {"page_number": 2, "results_per_page": 50, "reverse": "true"}
assert out.pagination and out.pagination.total_count == 1
def test_list_omits_params_when_default(self, mock_send: MagicMock):
mock_send.return_value = {"data": [], "pagination": None}
svc.RBACService.Roles.list("tenant-1")
assert _call_args(mock_send).params is None
def test_list_coerces_null_permission_keys(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{
"id": "role-1",
"tenant_id": "tenant-1",
"type": "workspace",
"category": "global_custom",
"name": "Owner",
"permission_keys": None,
}
],
"pagination": None,
}
out = svc.RBACService.Roles.list("tenant-1")
assert out.data[0].permission_keys == []
def test_get_passes_id_query_param(self, mock_send: MagicMock):
mock_send.return_value = {"id": "role-1", "type": "workspace", "name": "Owner"}
svc.RBACService.Roles.get("tenant-1", "acct-1", "role-1")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/roles/item"
assert call.params == {"id": "role-1"}
def test_create_sends_body(self, mock_send: MagicMock):
mock_send.return_value = {"id": "role-1", "type": "workspace", "name": "Owner"}
payload = svc.RoleMutation(name="Owner", description="full access", permission_keys=["workspace.member.manage"])
svc.RBACService.Roles.create("tenant-1", "acct-1", payload)
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/roles"
assert call.json == {
"name": "Owner",
"description": "full access",
"permission_keys": ["workspace.member.manage"],
"type": "workspace",
}
def test_update_sends_id_param_and_body(self, mock_send: MagicMock):
mock_send.return_value = {"id": "role-1", "type": "workspace", "name": "Owner"}
payload = svc.RoleMutation(name="Owner", permission_keys=["x"])
svc.RBACService.Roles.update("tenant-1", "acct-1", "role-1", payload)
call = _call_args(mock_send)
assert call.method == "PUT"
assert call.endpoint == "/rbac/roles/item"
assert call.params == {"id": "role-1"}
assert call.json == {"name": "Owner", "description": "", "permission_keys": ["x"], "type": "workspace"}
def test_delete_uses_delete_method(self, mock_send: MagicMock):
mock_send.return_value = {"message": "success"}
svc.RBACService.Roles.delete("tenant-1", None, "role-1")
call = _call_args(mock_send)
assert call.method == "DELETE"
assert call.endpoint == "/rbac/roles/item"
assert call.params == {"id": "role-1"}
assert call.account_id is None
def test_copy_sends_post_with_id_param(self, mock_send: MagicMock):
mock_send.return_value = {"id": "role-1-copy", "type": "workspace", "name": "Owner copy"}
svc.RBACService.Roles.copy("tenant-1", "acct-1", "role-1")
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/roles/copy"
assert call.params == {"id": "role-1"}
assert call.account_id == "acct-1"
class TestAccessPolicies:
def test_list_filters_by_resource_type(self, mock_send: MagicMock):
mock_send.return_value = {"data": [], "pagination": None}
svc.RBACService.AccessPolicies.list(
"tenant-1",
"acct-1",
resource_type=svc.RBACResourceType.APP,
options=svc.ListOption(page_number=1),
)
call = _call_args(mock_send)
assert call.endpoint == "/rbac/access-policies"
assert call.params == {"page_number": 1, "resource_type": "app"}
def test_copy_sends_post_with_id_param(self, mock_send: MagicMock):
mock_send.return_value = {
"id": "policy-1-copy",
"resource_type": "app",
"name": "Full access copy",
}
svc.RBACService.AccessPolicies.copy("tenant-1", "acct-1", "policy-1")
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/access-policies/copy"
assert call.params == {"id": "policy-1"}
def test_create_serialises_resource_type_enum(self, mock_send: MagicMock):
mock_send.return_value = {"id": "policy-1", "resource_type": "dataset", "name": "KB only"}
payload = svc.AccessPolicyCreate(
name="KB only",
resource_type=svc.RBACResourceType.DATASET,
permission_keys=["dataset.acl.readonly"],
)
svc.RBACService.AccessPolicies.create("tenant-1", "acct-1", payload)
call = _call_args(mock_send)
assert call.method == "POST"
assert call.json == {
"name": "KB only",
"resource_type": "dataset",
"description": "",
"permission_keys": ["dataset.acl.readonly"],
}
class TestResourceAccess:
def test_app_matrix(self, mock_send: MagicMock):
mock_send.return_value = {"app_id": "app-1", "items": []}
out = svc.RBACService.AppAccess.matrix("tenant-1", "acct-1", "app-1")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/apps/access-policy"
assert call.params == {"app_id": "app-1"}
assert out.app_id == "app-1"
def test_app_replace_bindings(self, mock_send: MagicMock):
mock_send.return_value = {"data": []}
payload = svc.ReplaceBindings(role_ids=["workspace.owner"], account_ids=["acct-2"])
svc.RBACService.AppAccess.replace_bindings("tenant-1", "acct-1", "app-1", "policy-1", payload)
call = _call_args(mock_send)
assert call.method == "PUT"
assert call.endpoint == "/rbac/apps/access-policy/bindings"
assert call.params == {"app_id": "app-1", "policy_id": "policy-1"}
assert call.json == {"role_ids": ["workspace.owner"], "account_ids": ["acct-2"]}
def test_dataset_replace_bindings(self, mock_send: MagicMock):
mock_send.return_value = {"data": []}
payload = svc.ReplaceBindings(role_ids=["workspace.editor"], account_ids=["acct-2"])
svc.RBACService.DatasetAccess.replace_bindings("tenant-1", "acct-1", "ds-1", "policy-1", payload)
call = _call_args(mock_send)
assert call.method == "PUT"
assert call.endpoint == "/rbac/datasets/access-policy/bindings"
assert call.params == {"dataset_id": "ds-1", "policy_id": "policy-1"}
assert call.json == {"role_ids": ["workspace.editor"], "account_ids": ["acct-2"]}
class TestWorkspaceAccess:
def test_app_matrix(self, mock_send: MagicMock):
mock_send.return_value = {"items": [], "pagination": {"total_count": 1, "per_page": 20, "current_page": 2, "total_pages": 1}}
out = svc.RBACService.WorkspaceAccess.app_matrix(
"tenant-1",
options=svc.ListOption(page_number=2, results_per_page=20),
)
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/workspace/apps/access-policy"
assert call.params == {"page_number": 2, "results_per_page": 20}
assert out.pagination and out.pagination.current_page == 2
def test_dataset_matrix(self, mock_send: MagicMock):
mock_send.return_value = {"items": []}
svc.RBACService.WorkspaceAccess.dataset_matrix("tenant-1")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/workspace/datasets/access-policy"
assert call.params is None
def test_workspace_matrix_coerces_null_bindings(self, mock_send: MagicMock):
mock_send.return_value = {
"items": [
{
"policy": {
"id": "policy-1",
"resource_type": "app",
"name": "Workspace App Access",
},
"roles": None,
"accounts": None,
}
],
"pagination": None,
}
out = svc.RBACService.WorkspaceAccess.app_matrix("tenant-1")
assert out.items[0].roles == []
assert out.items[0].accounts == []
def test_workspace_app_replace_bindings(self, mock_send: MagicMock):
mock_send.return_value = {"data": []}
payload = svc.ReplaceBindings(role_ids=["workspace.editor"], account_ids=["acct-2"])
svc.RBACService.WorkspaceAccess.replace_app_bindings(
"tenant-1", "acct-1", "policy-1", payload
)
call = _call_args(mock_send)
assert call.method == "PUT"
assert call.endpoint == "/rbac/workspace/apps/access-policy/bindings"
assert call.params == {"policy_id": "policy-1"}
assert call.json == {"role_ids": ["workspace.editor"], "account_ids": ["acct-2"]}
def test_workspace_dataset_replace_bindings(self, mock_send: MagicMock):
mock_send.return_value = {"data": []}
payload = svc.ReplaceBindings(role_ids=["workspace.editor"], account_ids=["acct-2"])
svc.RBACService.WorkspaceAccess.replace_dataset_bindings(
"tenant-1", "acct-1", "policy-1", payload
)
call = _call_args(mock_send)
assert call.method == "PUT"
assert call.endpoint == "/rbac/workspace/datasets/access-policy/bindings"
assert call.params == {"policy_id": "policy-1"}
assert call.json == {"role_ids": ["workspace.editor"], "account_ids": ["acct-2"]}
class TestMyPermissions:
def test_get_without_payload_uses_get(self, mock_send: MagicMock):
mock_send.return_value = {
"workspace": {"permission_keys": ["workspace.member.manage"]},
"app": {"default_permission_keys": ["app.acl.view_layout", "app.acl.test_and_run"], "overrides": []},
"dataset": {"default_permission_keys": [], "overrides": []},
}
with patch(f"{MODULE}.dify_config.RBAC_ENABLED", True):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/my-permissions"
assert call.json is None
assert call.params is None
assert out.workspace.permission_keys == ["workspace.member.manage"]
@pytest.mark.parametrize(
("role", "workspace_keys", "app_keys", "dataset_keys"),
[
(
"owner",
["workspace.member.manage", "workspace.role.manage"],
["app.acl.view_layout", "app.acl.test_and_run", "app.acl.edit", "app.acl.access_config"],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
(
"admin",
["workspace.member.manage", "workspace.role.manage"],
["app.acl.view_layout", "app.acl.test_and_run", "app.acl.edit", "app.acl.access_config"],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
(
"editor",
[],
["app.acl.view_layout", "app.acl.test_and_run", "app.acl.edit", "app.acl.access_config"],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
(
"normal",
[],
["app.acl.view_layout", "app.acl.test_and_run"],
[],
),
(
"dataset_operator",
[],
[],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
],
)
def test_get_uses_legacy_role_permissions_when_rbac_disabled(
self,
mock_send: MagicMock,
role: str,
workspace_keys: list[str],
app_keys: list[str],
dataset_keys: list[str],
):
mock_session = MagicMock()
mock_session.__enter__.return_value = mock_session
mock_session.scalar.return_value = role
with (
patch(f"{MODULE}.dify_config.RBAC_ENABLED", False),
patch(f"{MODULE}.session_factory.create_session", return_value=mock_session),
):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
mock_send.assert_not_called()
assert out.workspace.permission_keys == workspace_keys
assert out.app.default_permission_keys == app_keys
assert out.dataset.default_permission_keys == dataset_keys
assert out.app.overrides == []
assert out.dataset.overrides == []
def test_get_returns_empty_when_role_missing_and_rbac_disabled(self, mock_send: MagicMock):
mock_session = MagicMock()
mock_session.__enter__.return_value = mock_session
mock_session.scalar.return_value = None
with (
patch(f"{MODULE}.dify_config.RBAC_ENABLED", False),
patch(f"{MODULE}.session_factory.create_session", return_value=mock_session),
):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
mock_send.assert_not_called()
assert out.workspace.permission_keys == []
assert out.app.default_permission_keys == []
assert out.dataset.default_permission_keys == []
def test_get_with_single_resource_filters(self, mock_send: MagicMock):
mock_send.return_value = {
"workspace": {"permission_keys": []},
"app": {"default_permission_keys": [], "overrides": [{"resource_id": "app-1", "permission_keys": ["app.acl.edit"]}]},
"dataset": {"default_permission_keys": [], "overrides": []},
}
with patch(f"{MODULE}.dify_config.RBAC_ENABLED", True):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1", app_id="app-1")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/my-permissions"
assert call.params == {"app_id": "app-1"}
assert out.app.overrides[0].resource_id == "app-1"
class TestMemberRoles:
def test_get(self, mock_send: MagicMock):
mock_send.return_value = {
"account_id": "acct-2",
"roles": [
{
"id": "role-1",
"type": "workspace",
"name": "Member",
}
],
}
out = svc.RBACService.MemberRoles.get("tenant-1", "acct-1", "acct-2")
call = _call_args(mock_send)
assert call.method == "GET"
assert call.endpoint == "/rbac/members/rbac-roles"
assert call.params == {"account_id": "acct-2"}
assert out.account_id == "acct-2"
assert out.roles[0].name == "Member"
def test_replace(self, mock_send: MagicMock):
mock_send.return_value = {"account_id": "acct-2", "roles": []}
svc.RBACService.MemberRoles.replace(
"tenant-1", "acct-1", "acct-2", role_ids=["workspace.owner", "workspace.editor"]
)
call = _call_args(mock_send)
assert call.method == "PUT"
assert call.endpoint == "/rbac/members/rbac-roles"
assert call.params == {"account_id": "acct-2"}
assert call.json == {"role_ids": ["workspace.owner", "workspace.editor"]}
def test_batch_get(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{
"account_id": "acct-2",
"roles": [
{"id": "role-1", "name": "Admin"},
{"id": "role-2", "name": "Editor"},
],
},
{
"account_id": "acct-3",
"roles": [],
},
]
}
out = svc.RBACService.MemberRoles.batch_get("tenant-1", "acct-1", ["acct-2", "acct-3"])
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/members/rbac-roles/batch"
assert call.json == {"account_ids": ["acct-2", "acct-3"]}
assert out[0].account_id == "acct-2"
assert len(out[0].roles) == 2
class TestResourcePermissions:
def test_app_permissions_batch_get(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{"resource_id": "app-1", "permission_keys": ["app.acl.view_layout", "app.acl.edit"]},
{"resource_id": "app-2", "permission_keys": []},
]
}
out = svc.RBACService.AppPermissions.batch_get("tenant-1", "acct-1", ["app-1", "app-2"])
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/apps/permission-keys/batch"
assert call.json == {"app_ids": ["app-1", "app-2"]}
assert out == {
"app-1": ["app.acl.view_layout", "app.acl.edit"],
"app-2": [],
}
def test_dataset_permissions_batch_get(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{"resource_id": "ds-1", "permission_keys": ["dataset.acl.readonly"]},
{"resource_id": "ds-2", "permission_keys": ["dataset.acl.edit"]},
]
}
out = svc.RBACService.DatasetPermissions.batch_get("tenant-1", "acct-1", ["ds-1", "ds-2"])
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/datasets/permission-keys/batch"
assert call.json == {"dataset_ids": ["ds-1", "ds-2"]}
assert out == {
"ds-1": ["dataset.acl.readonly"],
"ds-2": ["dataset.acl.edit"],
}
class TestListOption:
def test_empty_produces_empty_params(self):
assert svc.ListOption().to_params() == {}
def test_reverse_serialises_as_lowercase_bool(self):
assert svc.ListOption(reverse=False).to_params()["reverse"] == "false"
assert svc.ListOption(reverse=True).to_params()["reverse"] == "true"
def test_extra_overrides_merge(self):
assert svc.ListOption(page_number=1).to_params({"resource_type": "app", "skip": None}) == {
"page_number": 1,
"resource_type": "app",
}