diff --git a/api/commands/rbac.py b/api/commands/rbac.py new file mode 100644 index 0000000000..92cc6e26a6 --- /dev/null +++ b/api/commands/rbac.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import click +from sqlalchemy import select + +from core.db.session_factory import session_factory +from models import TenantAccountJoin, TenantAccountRole +from services.enterprise.rbac_service import ListOption, RBACService + + +def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_role: str) -> str: + """Resolve a legacy workspace role to the current tenant's builtin RBAC role id. + + The migration replays the old `TenantAccountJoin.role` values onto the + RBAC member-role binding API. Builtin RBAC roles are tenant-scoped and + identified by runtime ids, so the command must look them up per tenant. + """ + expected_builtin_name = { + TenantAccountRole.OWNER.value: "所有者", + TenantAccountRole.ADMIN.value: "管理者", + TenantAccountRole.EDITOR.value: "编辑者", + TenantAccountRole.NORMAL.value: "普通用户", + TenantAccountRole.DATASET_OPERATOR.value: "知识库操作员", + }.get(legacy_role) + if not expected_builtin_name: + raise ValueError(f"Unsupported legacy workspace role: {legacy_role}") + + roles = RBACService.Roles.list( + tenant_id=tenant_id, + account_id=operator_account_id, + options=ListOption(page_number=1, results_per_page=100), + ).data + for role in roles: + if role.is_builtin and role.category == "global_system_default" and role.name == expected_builtin_name: + return str(role.id) + + raise ValueError(f"Builtin RBAC role not found for tenant={tenant_id}, legacy_role={legacy_role}") + + +@click.command("rbac-migrate-member-roles", help="Migrate legacy workspace member roles into RBAC member-role bindings.") +@click.option("--tenant-id", help="Only migrate a single workspace.") +@click.option("--dry-run", is_flag=True, default=False, help="Preview the migration without writing RBAC bindings.") +def migrate_member_roles_to_rbac(tenant_id: str | None, dry_run: bool) -> None: + """Backfill RBAC member-role bindings from legacy `TenantAccountJoin.role` data. + + This is an offline migration command for workspaces that already have + members in the legacy role model but need matching records in the RBAC + member-role binding store. + """ + click.echo(click.style("Starting RBAC member-role migration.", fg="green")) + + with session_factory.create_session() as session: + stmt = select(TenantAccountJoin).order_by(TenantAccountJoin.tenant_id.asc(), TenantAccountJoin.id.asc()) + if tenant_id: + stmt = stmt.where(TenantAccountJoin.tenant_id == tenant_id) + + joins = list(session.scalars(stmt).all()) + + if not joins: + click.echo(click.style("No workspace members found for migration.", fg="yellow")) + return + + owner_account_by_tenant: dict[str, str] = {} + resolved_role_ids: dict[tuple[str, str], str] = {} + migrated_count = 0 + + for join in joins: + workspace_id = str(join.tenant_id) + member_account_id = str(join.account_id) + legacy_role = str(join.role) + + if workspace_id not in owner_account_by_tenant: + owner_join = next( + ( + item + for item in joins + if str(item.tenant_id) == workspace_id and str(item.role) == TenantAccountRole.OWNER.value + ), + None, + ) + if not owner_join: + raise ValueError(f"Workspace owner not found for tenant={workspace_id}") + owner_account_by_tenant[workspace_id] = str(owner_join.account_id) + + operator_account_id = owner_account_by_tenant[workspace_id] + cache_key = (workspace_id, legacy_role) + if cache_key not in resolved_role_ids: + resolved_role_ids[cache_key] = _resolve_builtin_role_id(workspace_id, operator_account_id, legacy_role) + + resolved_role_id = resolved_role_ids[cache_key] + click.echo( + f"tenant={workspace_id} member={member_account_id} legacy_role={legacy_role} -> rbac_role_id={resolved_role_id}" + ) + + if dry_run: + continue + + RBACService.MemberRoles.replace( + tenant_id=workspace_id, + account_id=operator_account_id, + member_account_id=member_account_id, + role_ids=[resolved_role_id], + ) + migrated_count += 1 + + if dry_run: + click.echo(click.style("Dry run completed. No RBAC bindings were written.", fg="yellow")) + else: + click.echo(click.style(f"RBAC member-role migration completed. Migrated {migrated_count} members.", fg="green"))