FastAPI依赖注入与上下文管理

avatar
cmdragon 渡劫
image image

扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意

FastAPI框架依赖注入与上下文管理实战指南

1. 全局依赖配置原理与实现

1.1 全局依赖的核心作用

全局依赖是FastAPI实现跨路由通用逻辑的关键机制,其核心作用包括:

  • 统一处理认证鉴权
  • 标准化响应格式
  • 集中收集请求日志
  • 管理数据库会话生命周期
  • 实施统一速率限制
1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import Depends, FastAPI, Header

app = FastAPI()


async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401)
return authorization[7:]


app = FastAPI(dependencies=[Depends(verify_token)])

1.2 多层级依赖配置

FastAPI支持灵活的依赖注入层级:

层级类型作用范围典型应用场景
全局依赖所有路由身份认证、请求日志
路由组依赖指定路由组API版本控制、权限分级
单路由依赖单个路由特殊参数校验、业务级权限

1.3 数据库会话实战案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"

engine = create_async_engine(DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时执行
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 应用关闭时执行
await engine.dispose()


async def get_db():
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise


app = FastAPI(lifespan=lifespan, dependencies=[Depends(get_db)])

2. 应用生命周期管理

2.1 生命周期事件实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fastapi import FastAPI
from contextlib import asynccontextmanager


@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化Redis连接池
app.state.redis = await create_redis_pool()
yield
# 关闭时释放资源
await app.state.redis.close()


app = FastAPI(lifespan=lifespan)

2.2 全局状态管理

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI, Request

app = FastAPI()


@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# 记录到全局状态
request.app.state.request_count += 1
return response

3. 综合应用案例:电商系统架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from fastapi import APIRouter, Depends
from pydantic import BaseModel


class ProductCreate(BaseModel):
name: str
price: float
stock: int


router = APIRouter(prefix="/products")


@router.post("")
async def create_product(
product_data: ProductCreate,
db: AsyncSession = Depends(get_db),
redis=Depends(get_redis)
):
# 检查商品名称重复
existing = await db.execute(
select(Product).filter(Product.name == product_data.name)
)
if existing.scalar():
raise HTTPException(400, "Product name exists")

# 写入数据库
new_product = Product(**product_data.dict())
db.add(new_product)
await db.commit()

# 更新缓存
await redis.delete("product_list")

return {"id": new_product.id}

课后Quiz

Q1:当遇到数据库连接池耗尽问题时,应该如何排查?
A. 检查数据库服务器状态
B. 增加连接池最大连接数
C. 检查是否忘记释放会话
D. 所有以上选项

正确答案:D。连接池问题需要综合排查,包括服务器资源、配置参数和代码逻辑。

Q2:为什么推荐使用yield方式管理数据库会话?
A. 实现事务的自动提交
B. 确保异常时回滚事务
C. 自动关闭会话连接
D. 所有以上选项

正确答案:D。yield语法可以完美实现会话的生命周期管理。

常见报错解决方案

错误1:RuntimeError: No response returned.
原因:依赖项中未正确返回响应
解决:

1
2
3
4
5
6
async def auth_dependency():
try:
# 验证逻辑
yield
except Exception as e:
return JSONResponse(status_code=401, content={"error": str(e)})

错误2:sqlalchemy.exc.InterfaceError: Connection closed unexpectedly
原因:数据库连接超时
预防:

1
2
3
4
5
6
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)

错误3:pydantic.error_wrappers.ValidationError
原因:请求体数据验证失败
排查步骤:

  1. 检查请求头Content-Type是否正确
  2. 验证请求体JSON格式
  3. 检查Pydantic模型定义
  4. 使用curl测试请求:
1
2
3
curl -X POST http://localhost:8000/items \
-H "Content-Type: application/json" \
-d '{"name":"example", "price": 9.99}'

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: