用户认证的魔法配方:从模型设计到密码安全的奇幻之旅

avatar
cmdragon 大乘
image image

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意https://tools.cmdragon.cn/

第四章:用户认证体系搭建

1. 用户模型设计与数据库集成

1.1 用户模型设计原则

用户模型是认证系统的核心数据结构,需要包含以下基础字段:

  • id:主键标识符(建议使用UUID)
  • username:唯一用户名(带格式校验)
  • email:唯一电子邮箱(带格式校验)
  • hashed_password:加密后的密码
  • is_active:账户激活状态
  • created_at:账户创建时间戳

使用SQLAlchemy ORM的示例模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from datetime import datetime
from uuid import uuid4
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class User(Base):
__tablename__ = "users"

id = Column(String(36), primary_key=True, default=lambda: str(uuid4()))
username = Column(String(50), unique=True, nullable=False)
email = Column(String(255), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)

def __repr__(self):
return f"<User {self.username}>"

1.2 数据库集成配置

推荐使用异步数据库驱动提升性能,以下是PostgreSQL配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False
)


async def get_db():
async with AsyncSessionLocal() as session:
yield session

使用Alembic进行数据库迁移:

1
2
3
4
5
6
7
8
# 初始化迁移环境
alembic init migrations

# 生成迁移文件
alembic revision --autogenerate -m "create users table"

# 执行迁移
alembic upgrade head

2. 用户密码安全规范

2.1 密码存储最佳实践

密码存储必须遵循以下安全准则:

  • 禁止明文存储
  • 使用强哈希算法(推荐bcrypt)
  • 自动加盐处理
  • 哈希迭代次数不少于12次

密码处理工具类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# security.py
from passlib.context import CryptContext

pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12
)


def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

增强版用户模型:

1
2
3
4
5
6
7
8
class User(Base):
# ...其他字段同上

def set_password(self, password: str):
self.hashed_password = get_password_hash(password)

def check_password(self, password: str) -> bool:
return verify_password(password, self.hashed_password)

2.2 密码验证流程设计

密码验证应包含多级安全检查:

1
2
3
4
5
6
7
8
9
10
11
12
from pydantic import BaseModel, constr


class UserCreate(BaseModel):
username: constr(min_length=4, max_length=50)
email: str
password: constr(min_length=8)


class UserLogin(BaseModel):
username: str
password: str

注册路由实现示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter()


@router.post("/register")
async def register(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
# 检查用户名是否已存在
existing_user = await db.execute(
select(User).where(User.username == user_data.username)
)
if existing_user.scalars().first():
raise HTTPException(400, "Username already registered")

# 创建用户对象
new_user = User(
username=user_data.username,
email=user_data.email
)
new_user.set_password(user_data.password)

# 保存到数据库
db.add(new_user)
await db.commit()
await db.refresh(new_user)

return {"message": "User created successfully"}

3. 课后Quiz

问题1:以下哪种密码存储方式最安全?
A) MD5哈希
B) SHA256哈希
C) Bcrypt哈希
D) 明文存储

答案与解析:选C。Bcrypt是专门为密码存储设计的哈希算法,包含自动加盐和可调节计算成本的特点,相比MD5和SHA256这类快速哈希算法,能更有效防御暴力破解。

问题2:为什么用户模型需要is_active字段?
A) 记录用户最后登录时间
B) 实现账户软删除功能
C) 控制API访问频率
D) 存储用户偏好设置

答案与解析:选B。is_active字段用于实现账户的启用/禁用状态管理,当设置为False时,即使用户凭证正确也不允许登录,实现软删除而不丢失数据。

4. 常见报错解决方案

报错1422 Unprocessable Entity

  • 现象:请求体参数验证失败
  • 解决方法:
    1. 检查请求体是否符合Pydantic模型定义
    2. 验证密码字段是否满足最小长度要求
    3. 确认Content-Type头设置为application/json

报错2asyncpg.exceptions.UniqueViolationError

  • 现象:违反数据库唯一约束
  • 解决方法:
    1. 在插入数据前检查用户名/邮箱是否已存在
    2. 添加数据库唯一索引
    3. 使用事务处理保证数据一致性

报错3AttributeError: 'NoneType' object has no attribute 'check_password'

  • 现象:用户对象查询为空
  • 解决方法:
    1. 检查数据库查询是否返回有效结果
    2. 确认用户名拼写是否正确
    3. 验证数据库连接是否正常

5. 环境依赖说明

运行本示例需要以下依赖:

1
2
3
4
5
6
7
8
fastapi==0.68.2
uvicorn==0.15.0
sqlalchemy==1.4.35
asyncpg==0.24.0
passlib==1.7.4
python-multipart==0.0.5
alembic==1.7.5
pydantic==1.8.2

安装命令:

1
pip install fastapi uvicorn sqlalchemy asyncpg passlib python-multipart alembic pydantic

本示例已通过PostgreSQL 13和Python 3.9验证,建议使用虚拟环境运行。数据库连接字符串需要根据实际环境修改,开发阶段可使用SQLite进行快速验证。

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: