FastAPI数据库集成与事务管理

avatar
cmdragon 大乘
image image

扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意

FastAPI数据库集成与事务管理完全指南

1. 环境准备与基础配置

在项目根目录创建database.py文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 安装依赖:pip install fastapi uvicorn sqlalchemy pydantic
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 配置SQLite数据库连接(生产环境建议使用PostgreSQL)
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
echo=True # 显示生成的SQL语句
)

# 创建数据库会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 声明性基类
Base = declarative_base()

2. 模型定义与表结构映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sqlalchemy import Column, Integer, String


class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
balance = Column(Integer, default=0)


# 创建数据库表(生产环境建议使用迁移工具)
Base.metadata.create_all(bind=engine)

3. Pydantic数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pydantic import BaseModel


class UserCreate(BaseModel):
name: str
email: str
balance: int = 0


class UserResponse(BaseModel):
id: int
name: str
email: str
balance: int

class Config:
orm_mode = True # 启用ORM模式转换

4. 路由与数据库操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

router = APIRouter()


# 依赖项获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()


@router.post("/users/", response_model=UserResponse)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# 显式开始事务
transaction = db.begin()
try:
db_user = User(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
except Exception as e:
transaction.rollback()
raise HTTPException(
status_code=400,
detail=f"创建用户失败: {str(e)}"
)

5. 事务控制与错误处理

多操作事务示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def transfer_funds(sender_id: int, receiver_id: int, amount: int, db: Session):
transaction = db.begin()
try:
# 获取发送方账户
sender = db.query(User).filter(User.id == sender_id).with_for_update().first()
if not sender or sender.balance < amount:
raise ValueError("余额不足或账户不存在")

# 获取接收方账户
receiver = db.query(User).filter(User.id == receiver_id).with_for_update().first()
if not receiver:
raise ValueError("接收方账户不存在")

# 执行转账
sender.balance -= amount
receiver.balance += amount

db.commit()
return {"message": "转账成功"}

except Exception as e:
transaction.rollback()
raise HTTPException(status_code=400, detail=str(e))

6. 常见报错解决方案

问题1:422 Validation Error

  • 现象:请求参数验证失败
  • 解决方法:
    1. 检查请求体是否符合Pydantic模型定义
    2. 使用Swagger UI测试接口
    3. 查看返回的detail字段中的具体错误信息

问题2:500 Internal Server Error

  • 现象:数据库连接失败
  • 解决方法:
    1. 检查数据库URL格式是否正确
    2. 验证数据库服务是否正常运行
    3. 检查数据库用户权限设置

问题3:IntegrityError (sqlalchemy.exc.IntegrityError)

  • 现象:违反数据库约束
  • 解决方法:
    1. 检查唯一性约束字段(如email)
    2. 验证外键关联是否存在
    3. 确保NOT NULL字段都有值

7. 课后Quiz

Q1:以下哪种情况会导致事务自动回滚?
A) 代码中显式调用commit()
B) 发生未捕获的异常
C) 使用with_for_update()
D) 调用refresh()方法

正确答案:B
解析:当数据库操作过程中出现未捕获的异常时,SQLAlchemy会自动回滚当前事务,保证数据一致性。

Q2:如何防止SQL注入攻击?
A) 使用字符串拼接查询
B) 始终使用ORM查询方法
C) 手动转义特殊字符
D) 关闭数据库日志

正确答案:B
解析:SQLAlchemy的ORM系统会自动处理参数化查询,避免直接拼接SQL语句,从根本上防止SQL注入。

Q3:什么情况下需要使用with_for_update()?
A) 需要提高查询性能
B) 处理并发写操作
C) 创建数据库索引
D) 执行批量插入

正确答案:B
解析:with_for_update()在事务中锁定查询行,防止其他事务修改,用于处理需要保证数据一致性的并发写操作场景。

通过本文的学习,您应该已经掌握FastAPI集成SQLAlchemy的核心方法,理解事务控制原理,并能够处理常见的数据库操作问题。建议在实际项目中结合Alembic进行数据库迁移管理,并配置连接池优化性能。

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: