From cd3327013a82d73300a347fe6cf6a79c67db47d0 Mon Sep 17 00:00:00 2001 From: Escape0707 Date: Thu, 7 May 2026 18:57:53 +0900 Subject: [PATCH] chore(test): Move plugin permission tests to testcontainers (#35884) --- .../plugin/test_plugin_permission_service.py | 94 +++++++++++++++++++ .../plugin/test_plugin_permission_service.py | 79 ---------------- 2 files changed, 94 insertions(+), 79 deletions(-) create mode 100644 api/tests/test_containers_integration_tests/services/plugin/test_plugin_permission_service.py delete mode 100644 api/tests/unit_tests/services/plugin/test_plugin_permission_service.py diff --git a/api/tests/test_containers_integration_tests/services/plugin/test_plugin_permission_service.py b/api/tests/test_containers_integration_tests/services/plugin/test_plugin_permission_service.py new file mode 100644 index 0000000000..49d06986fd --- /dev/null +++ b/api/tests/test_containers_integration_tests/services/plugin/test_plugin_permission_service.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from uuid import uuid4 + +from sqlalchemy import func, select +from sqlalchemy.orm import Session + +from models.account import TenantPluginPermission +from services.plugin.plugin_permission_service import PluginPermissionService + + +def _tenant_id() -> str: + return str(uuid4()) + + +def _get_permission(session: Session, tenant_id: str) -> TenantPluginPermission | None: + session.expire_all() + stmt = select(TenantPluginPermission).where(TenantPluginPermission.tenant_id == tenant_id) + return session.scalars(stmt).one_or_none() + + +def _count_permissions(session: Session, tenant_id: str) -> int: + stmt = select(func.count()).select_from(TenantPluginPermission).where(TenantPluginPermission.tenant_id == tenant_id) + return session.scalar(stmt) or 0 + + +class TestGetPermission: + """Integration tests for PluginPermissionService.get_permission using testcontainers.""" + + def test_returns_permission_when_found(self, db_session_with_containers: Session): + tenant_id = _tenant_id() + permission = TenantPluginPermission( + tenant_id=tenant_id, + install_permission=TenantPluginPermission.InstallPermission.ADMINS, + debug_permission=TenantPluginPermission.DebugPermission.EVERYONE, + ) + db_session_with_containers.add(permission) + db_session_with_containers.commit() + + result = PluginPermissionService.get_permission(tenant_id) + + assert result is not None + assert result.id == permission.id + assert result.tenant_id == tenant_id + assert result.install_permission == TenantPluginPermission.InstallPermission.ADMINS + assert result.debug_permission == TenantPluginPermission.DebugPermission.EVERYONE + + def test_returns_none_when_not_found(self, db_session_with_containers: Session): + result = PluginPermissionService.get_permission(_tenant_id()) + + assert result is None + + +class TestChangePermission: + """Integration tests for PluginPermissionService.change_permission using testcontainers.""" + + def test_creates_new_permission_when_not_exists(self, db_session_with_containers: Session): + tenant_id = _tenant_id() + + result = PluginPermissionService.change_permission( + tenant_id, + TenantPluginPermission.InstallPermission.EVERYONE, + TenantPluginPermission.DebugPermission.EVERYONE, + ) + + permission = _get_permission(db_session_with_containers, tenant_id) + assert result is True + assert permission is not None + assert permission.install_permission == TenantPluginPermission.InstallPermission.EVERYONE + assert permission.debug_permission == TenantPluginPermission.DebugPermission.EVERYONE + + def test_updates_existing_permission(self, db_session_with_containers: Session): + tenant_id = _tenant_id() + existing = TenantPluginPermission( + tenant_id=tenant_id, + install_permission=TenantPluginPermission.InstallPermission.EVERYONE, + debug_permission=TenantPluginPermission.DebugPermission.EVERYONE, + ) + db_session_with_containers.add(existing) + db_session_with_containers.commit() + + result = PluginPermissionService.change_permission( + tenant_id, + TenantPluginPermission.InstallPermission.ADMINS, + TenantPluginPermission.DebugPermission.ADMINS, + ) + + permission = _get_permission(db_session_with_containers, tenant_id) + assert result is True + assert permission is not None + assert permission.id == existing.id + assert permission.install_permission == TenantPluginPermission.InstallPermission.ADMINS + assert permission.debug_permission == TenantPluginPermission.DebugPermission.ADMINS + assert _count_permissions(db_session_with_containers, tenant_id) == 1 diff --git a/api/tests/unit_tests/services/plugin/test_plugin_permission_service.py b/api/tests/unit_tests/services/plugin/test_plugin_permission_service.py deleted file mode 100644 index 53a9e6210c..0000000000 --- a/api/tests/unit_tests/services/plugin/test_plugin_permission_service.py +++ /dev/null @@ -1,79 +0,0 @@ -from unittest.mock import MagicMock, patch - -from models.account import TenantPluginPermission - -MODULE = "services.plugin.plugin_permission_service" - - -def _patched_session(): - """Patch session_factory.create_session() to return a mock session as context manager.""" - session = MagicMock() - session.__enter__ = MagicMock(return_value=session) - session.__exit__ = MagicMock(return_value=False) - session.begin.return_value.__enter__ = MagicMock(return_value=session) - session.begin.return_value.__exit__ = MagicMock(return_value=False) - mock_factory = MagicMock() - mock_factory.create_session.return_value = session - patcher = patch(f"{MODULE}.session_factory", mock_factory) - return patcher, session - - -class TestGetPermission: - def test_returns_permission_when_found(self): - p1, session = _patched_session() - permission = MagicMock() - session.scalar.return_value = permission - - with p1: - from services.plugin.plugin_permission_service import PluginPermissionService - - result = PluginPermissionService.get_permission("t1") - - assert result is permission - - def test_returns_none_when_not_found(self): - p1, session = _patched_session() - session.scalar.return_value = None - - with p1: - from services.plugin.plugin_permission_service import PluginPermissionService - - result = PluginPermissionService.get_permission("t1") - - assert result is None - - -class TestChangePermission: - def test_creates_new_permission_when_not_exists(self): - p1, session = _patched_session() - session.scalar.return_value = None - - with p1, patch(f"{MODULE}.select"), patch(f"{MODULE}.TenantPluginPermission") as perm_cls: - perm_cls.return_value = MagicMock() - from services.plugin.plugin_permission_service import PluginPermissionService - - result = PluginPermissionService.change_permission( - "t1", TenantPluginPermission.InstallPermission.EVERYONE, TenantPluginPermission.DebugPermission.EVERYONE - ) - - assert result is True - session.begin.assert_called_once() - session.add.assert_called_once() - - def test_updates_existing_permission(self): - p1, session = _patched_session() - existing = MagicMock() - session.scalar.return_value = existing - - with p1: - from services.plugin.plugin_permission_service import PluginPermissionService - - result = PluginPermissionService.change_permission( - "t1", TenantPluginPermission.InstallPermission.ADMINS, TenantPluginPermission.DebugPermission.ADMINS - ) - - assert result is True - session.begin.assert_called_once() - assert existing.install_permission == TenantPluginPermission.InstallPermission.ADMINS - assert existing.debug_permission == TenantPluginPermission.DebugPermission.ADMINS - session.add.assert_not_called()