JWT令牌如何在FastAPI中实现安全又高效的生成与验证?

avatar
cmdragon 大乘
image image

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意https://tools.cmdragon.cn/

第四章:JWT 令牌的生成与验证机制

1. JWT 基础概念

JSON Web Token(JWT)是一种开放标准(RFC 7519),用于在双方之间安全地传递声明信息。它由三部分组成:

  • Header(头部):描述算法和令牌类型
  • Payload(载荷):携带用户数据(如用户ID)和声明(如过期时间)
  • Signature(签名):用于验证令牌完整性的加密字符串

JWT 在 FastAPI 中的典型应用场景:

  • 用户身份认证
  • API 接口授权
  • 跨服务的安全通信
  • 无状态会话管理

2. 环境准备

安装所需依赖库(推荐使用虚拟环境):

1
pip install fastapi==0.95.2 python-jose[cryptography]==3.3.0 passlib==1.7.4 bcrypt==4.0.1 uvicorn==0.22.0

3. 生成 JWT 令牌

3.1 核心配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# 安全配置
SECRET_KEY = "your-secret-key-here" # 生产环境应从环境变量获取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码哈希配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class Token(BaseModel):
access_token: str
token_type: str


class TokenData(BaseModel):
username: str | None = None

3.2 令牌生成函数

1
2
3
4
5
6
7
8
9
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

3.3 登录接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

router = APIRouter()


@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

4. JWT 验证机制

4.1 令牌验证中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user

4.2 受保护路由示例

1
2
3
@router.get("/users/me/")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user

5. 令牌刷新机制

实现令牌刷新接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@router.post("/refresh")
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise HTTPException(status_code=400, detail="Invalid token")

# 检查用户是否存在(需要实现具体数据库查询)
user = get_user(username)
if not user:
raise HTTPException(status_code=404, detail="User not found")

new_token = create_access_token(data={"sub": user.username})
return {"access_token": new_token}

except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")

课后Quiz

  1. 为什么JWT需要设置过期时间?
    A) 减少服务器内存占用
    B) 防止令牌被长期盗用
    C) 提高加密强度
    D) 简化开发流程

  2. 以下哪个做法会破坏JWT的安全性?
    A) 使用HTTPS传输令牌
    B) 将敏感数据存储在Payload中
    C) 定期轮换加密密钥
    D) 验证签名算法

  3. 如何处理令牌过期的情况?
    A) 返回500错误
    B) 要求用户重新登录
    C) 使用refresh token获取新令牌
    D) 自动延长过期时间

答案:

  1. B - 设置过期时间可限制令牌有效期,降低被盗用后的风险
  2. B - Payload内容虽然被加密但可被解码,不应存储敏感信息
  3. C - 最佳实践是通过refresh token机制更新访问令牌

常见报错解决方案

  1. 401 Unauthorized: Could not validate credentials

    • 原因:无效的令牌格式或签名不匹配
    • 解决:检查请求头的Bearer token格式,验证密钥一致性
  2. 422 Validation Error

    • 原因:请求体与Pydantic模型不匹配
    • 预防:使用精确的模型定义,添加字段验证规则
  3. 500 Internal Server Error: JWTError

    • 原因:令牌解码失败或算法不匹配
    • 处理:捕获JWTError异常,返回401状态码
    • 检查:确保服务端使用的算法与生成令牌时一致
  4. AttributeError: 'NoneType' has no attribute 'username'

    • 原因:数据库查询返回空值
    • 修复:在数据库查询后添加空值检查
    • 优化:使用Optional类型注解和空值处理

最佳实践建议:

  1. 生产环境使用RSA非对称加密(RS256算法)
  2. 将密钥存储在环境变量或密钥管理服务中
  3. 设置合理的令牌有效期(通常访问令牌15分钟,刷新令牌7天)
  4. 实现令牌撤销清单(黑名单机制)

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: