异步之舞:Motor驱动与MongoDB的CRUD交响曲

avatar
cmdragon 大乘
image image

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意https://tools.cmdragon.cn/

第二章:Motor 异步驱动与 CRUD 操作实践

1. Motor 异步驱动原理

MongoDB 的异步驱动 Motor 是专为 Python 异步框架设计的数据库连接器,其底层基于 asyncio 实现非阻塞 I/O 操作。与同步驱动相比,Motor
在执行数据库操作时不会阻塞事件循环,这使得 FastAPI 能够同时处理更多并发请求。

示例场景:想象餐厅里一个服务员(事件循环)同时服务多桌客人(请求),当某桌需要等待厨房做菜(数据库操作)时,服务员会先去服务其他餐桌,等厨房完成后再回来继续服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 安装依赖
# pip install fastapi==0.78.0 motor==2.5.0 pydantic==1.10.7

from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel

app = FastAPI()

# MongoDB 连接配置
DATABASE_URL = "mongodb://localhost:27017"
client = AsyncIOMotorClient(DATABASE_URL)
db = client["mydatabase"]
users_collection = db["users"]


class UserCreate(BaseModel):
name: str
age: int
email: str


class UserResponse(UserCreate):
id: str

2. CRUD 操作实现

2.1 创建文档

使用 insert_one 方法实现数据插入:

1
2
3
4
5
6
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate):
user_dict = user.dict()
result = await users_collection.insert_one(user_dict)
created_user = await users_collection.find_one({"_id": result.inserted_id})
return {**created_user, "id": str(created_user["_id"])}

2.2 查询文档

实现多条件查询和分页:

1
2
3
4
5
6
7
8
9
10
@app.get("/users", response_model=list[UserResponse])
async def get_users(skip: int = 0, limit: int = 10):
users = []
query = {"age": {"$gte": 18}} # 查询18岁以上用户
projection = {"_id": 0, "id": {"$toString": "$_id"}, "name": 1, "age": 1} # 字段投影

async for user in users_collection.find(query).skip(skip).limit(limit).project(projection):
users.append(user)

return users

2.3 更新文档

使用原子操作实现安全更新:

1
2
3
4
5
6
7
@app.put("/users/{user_id}")
async def update_user(user_id: str, user_update: UserCreate):
update_result = await users_collection.update_one(
{"_id": user_id},
{"$set": user_update.dict(exclude_unset=True)}
)
return {"modified_count": update_result.modified_count}

2.4 删除文档

软删除实现示例:

1
2
3
4
5
6
7
@app.delete("/users/{user_id}")
async def delete_user(user_id: str):
result = await users_collection.update_one(
{"_id": user_id},
{"$set": {"is_deleted": True}}
)
return {"modified_count": result.modified_count}

3. 聚合管道应用

统计用户年龄分布:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@app.get("/users/age-stats")
async def get_age_stats():
pipeline = [
{"$match": {"is_deleted": {"$ne": True}}},
{"$group": {
"_id": None,
"averageAge": {"$avg": "$age"},
"minAge": {"$min": "$age"},
"maxAge": {"$max": "$age"}
}}
]

result = await users_collection.aggregate(pipeline).to_list(1)
return result[0] if result else {}

4. 索引优化策略

4.1 单字段索引

1
2
3
4
# 创建索引
async def create_indexes():
await users_collection.create_index("email", unique=True)
await users_collection.create_index([("name", "text")])

4.2 复合索引

1
2
# 针对常用查询字段创建复合索引
await users_collection.create_index([("age", 1), ("is_deleted", 1)])

索引优化建议:

  1. 优先为查询条件字段建立索引
  2. 复合索引字段顺序遵循 ESR 规则(等值→排序→范围)
  3. 使用覆盖索引减少文档读取

课后 Quiz

Q1:Motor 的异步特性如何提升性能?

A) 减少数据库连接数
B) 允许单线程处理多个并发请求
C) 自动压缩传输数据
D) 缓存查询结果

答案B) 正确。异步驱动通过非阻塞 I/O 允许事件循环在处理数据库操作等待期间继续处理其他请求,提升并发处理能力。

Q2:如何防止重复插入相同 email 的用户?

A) 添加唯一索引
B) 在业务逻辑中检查
C) 使用事务
D) 以上都是

答案D) 正确。最佳实践是同时使用数据库唯一索引(A)和业务逻辑校验(B),在并发场景下可配合事务(C)保证数据一致性。

常见报错处理

报错1:ServerSelectionTimeoutError

现象:连接 MongoDB 超时

1
motor.motor_asyncio.ServerSelectionTimeoutError: ... 

解决

  1. 检查 MongoDB 服务是否运行
  2. 确认连接端口(默认27017)
  3. 验证防火墙设置

报错2:ValidationError

现象:请求参数校验失败

1
2
3
4
5
6
7
8
9
10
11
12
{
"detail": [
{
"loc": [
"body",
"age"
],
"msg": "field required",
"type": "value_error.missing"
}
]
}

处理

  1. 检查请求体是否符合 Pydantic 模型定义
  2. 使用 exclude_unset=True 处理可选字段
  3. 添加自定义验证器

报错3:DuplicateKeyError

现象:违反唯一性约束

1
pymongo.errors.DuplicateKeyError: E11000 duplicate key error...

处理

  1. 在插入前检查唯一字段
  2. 使用 update_one 配合 upsert=True
  3. 添加唯一索引确保数据一致性

通过本章学习,您将掌握 FastAPI 与 MongoDB 集成的核心技能。建议在开发过程中使用 MongoDB Compass 可视化工具实时观察数据变化,并结合
Python 的异步特性进行压力测试,深入理解异步编程的优势。

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: