如何在FastAPI中打造一个既安全又灵活的权限管理系统?

avatar
cmdragon 大乘
image image

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

发现1000+提升效率与开发的AI工具和实用程序https://tools.cmdragon.cn/

1
2
3
4
5
6
7
# 所需环境配置(运行前请安装)
# fastapi==0.95.0
# uvicorn==0.21.1
# python-multipart==0.0.6
# sqlalchemy==1.4.46
# pydantic==1.10.7
# passlib==1.7.4

1. 权限系统核心原理

权限系统的本质是请求过滤机制,FastAPI 通过依赖注入系统实现层级验证。当请求到达时,会经历:

  • 身份认证 → 角色验证 → 权限校验 三级验证
  • 每个层级都是独立的依赖项
  • 权限数据存储在关系型数据库,实现动态管理

2. 数据库模型设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from databases import Base


class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
hashed_password = Column(String(300))
is_active = Column(Boolean, default=True)
role_id = Column(Integer, ForeignKey("roles.id"))

role = relationship("Role", back_populates="users")


class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True)
name = Column(String(20), unique=True)
permissions = Column(String(500)) # 存储逗号分隔的权限标识
users = relationship("User", back_populates="role")


class PermissionRegistry(Base):
__tablename__ = "permission_registry"
id = Column(Integer, primary_key=True)
endpoint = Column(String(100)) # 路由路径
method = Column(String(10)) # HTTP方法
perm_code = Column(String(50)) # 权限标识

3. 权限验证依赖项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from fastapi import Depends, HTTPException, status
from pydantic import BaseModel


class PermissionValidator:
def __init__(self, required_perm: str):
self.required_perm = required_perm

async def __call__(self,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)):
# 获取角色关联的权限
role_perms = current_user.role.permissions.split(",")

# 验证权限是否存在
if self.required_perm not in role_perms:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有访问权限"
)

# 记录审计日志(示例)
audit_log = AuditLog(
user_id=current_user.id,
action=f"访问需要 {self.required_perm} 权限的端点"
)
db.add(audit_log)
db.commit()


# 使用示例
@app.get("/admin/dashboard")
async def admin_dashboard(
perm_check: bool = Depends(PermissionValidator("admin_dashboard"))):
return {"message": "欢迎来到管理面板"}

4. 动态路由权限注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class PermissionRegistration(BaseModel):
endpoint: str
methods: List[str]
perm_code: str


@app.post("/manage/permissions")
async def register_permission(
perm_data: PermissionRegistration,
db: Session = Depends(get_db)
):
for method in perm_data.methods:
existing = db.query(PermissionRegistry).filter_by(
endpoint=perm_data.endpoint,
method=method
).first()

if not existing:
new_perm = PermissionRegistry(
endpoint=perm_data.endpoint,
method=method,
perm_code=perm_data.perm_code
)
db.add(new_perm)

db.commit()
return {"status": "权限注册成功"}

5. 实时权限检查中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@app.middleware("http")
async def dynamic_permission_check(request: Request, call_next):
# 跳过非业务端点
if request.url.path.startswith(("/docs", "/redoc")):
return await call_next(request)

# 查询权限注册表
db = SessionLocal()
perm_record = db.query(PermissionRegistry).filter_by(
endpoint=request.url.path,
method=request.method
).first()

if perm_record:
# 验证用户权限
current_user = await get_current_user(request)
if perm_record.perm_code not in current_user.role.permissions.split(","):
return JSONResponse(
status_code=403,
content={"detail": "权限不足"}
)

response = await call_next(request)
return response

课后Quiz

  1. 当用户访问需要”article.edit”权限的接口,但该用户的角色权限只有”article.view”,系统会返回什么状态码?
    A) 401 B) 403 C) 404 D) 500

答案:B) 403。系统在权限验证阶段发现用户权限不足时,会返回403 Forbidden状态码。401表示未认证,404是资源不存在,500是服务器内部错误。

  1. 如何防止权限注册接口被未授权访问?
    A) 添加JWT认证依赖
    B) 限制仅管理员角色可访问
    C) 同时实现A和B
    D) 不需要保护这个接口

答案:C) 同时实现A和B。应该在路由定义中添加类似Depends(PermissionValidator(“perm_management”))的依赖,同时在用户角色系统中设置管理员专属权限。

常见报错处理

  1. 422 Unprocessable Entity
    原因:请求体不符合Pydantic模型验证
    解决:检查字段类型是否正确,添加缺失的必填字段

  2. AttributeError: ‘NoneType’ has no attribute ‘permissions’
    原因:用户角色未正确关联
    解决:检查数据库中的角色关联关系,确保每个用户都有对应的角色

  3. 数据库连接超时
    预防:使用SQLAlchemy的连接池配置

1
2
3
4
5
6
7
SQLALCHEMY_DATABASE_URL = "postgresql://user:pass@localhost/dbname?connect_timeout=10"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: