深入解析事务基础与原子操作原理

avatar
cmdragon 大乘
image image

扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意

1. 事务基础与原子操作原理

1.1 事务的ACID特性

在数据库操作中,事务需要满足ACID特性:

  • 原子性(Atomicity):操作要么全部成功,要么全部失败
  • 一致性(Consistency):保持数据库的完整性约束
  • 隔离性(Isolation):并发事务相互隔离
  • 持久性(Durability):提交后永久保存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 示例:银行转账的原子操作
from fastapi import APIRouter, Depends
from tortoise.transactions import atomic

router = APIRouter()


@router.post("/transfer")
@atomic() # 使用装饰器包裹事务范围
async def transfer_money(
from_account: str,
to_account: str,
amount: float
):
# 扣减转出账户
from_acc = await Account.get(number=from_account)
from_acc.balance -= amount
await from_acc.save()

# 增加转入账户
to_acc = await Account.get(number=to_account)
to_acc.balance += amount
await to_acc.save()

return {"message": "转账成功"}

1.2 事务隔离级别对比

级别脏读不可重复读幻读适用场景
读未提交可能可能可能低并发场景
读已提交禁止可能可能默认级别
可重复读禁止禁止可能金融系统
串行化禁止禁止禁止高精度要求

2. 嵌套事务实现与回滚点

2.1 嵌套事务上下文管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from tortoise.transactions import in_transaction


async def complex_operation():
async with in_transaction() as conn1: # 外层事务
await Model1.create(...)

try:
async with in_transaction(connection=conn1) as conn2: # 内层事务
await Model2.create(...)
await conn2.rollback() # 仅回滚内层操作
except Exception:
pass

await Model3.create(...) # 外层事务继续执行

2.2 回滚点(Savepoint)使用

1
2
3
4
5
6
7
8
9
10
async def savepoint_demo():
async with in_transaction() as conn:
savepoint = await conn.savepoint() # 创建回滚点

try:
await Model.create(...)
if error_condition:
await savepoint.rollback() # 回滚到保存点
except Exception:
await savepoint.rollback()

3. 完整实战案例:订单系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from fastapi import Depends
from tortoise.contrib.fastapi import register_tortoise
from tortoise.models import Model
from tortoise import fields


# 数据模型定义
class User(Model):
id = fields.IntField(pk=True)
balance = fields.DecimalField(max_digits=10, decimal_places=2)


class Order(Model):
id = fields.UUIDField(pk=True)
user = fields.ForeignKeyField('models.User')
amount = fields.DecimalField(max_digits=10, decimal_places=2)
status = fields.CharField(max_length=20)


# 事务处理服务
class OrderService:
@staticmethod
@atomic()
async def create_order(user_id: int, amount: float):
user = await User.get(id=user_id)

# 检查余额
if user.balance < amount:
raise ValueError("余额不足")

# 扣减余额
user.balance -= amount
await user.save()

# 创建订单记录
order = await Order.create(
user=user,
amount=amount,
status="PENDING"
)

# 模拟第三方支付调用
if not await call_payment_gateway():
await OrderService.rollback_order(order.id)
raise Exception("支付失败")

return order

@staticmethod
@atomic()
async def rollback_order(order_id: str):
order = await Order.get(id=order_id)
user = await order.user

# 恢复用户余额
user.balance += order.amount
await user.save()

# 更新订单状态
order.status = "CANCELED"
await order.save()

4. 常见报错解决方案

4.1 TransactionManagementError

错误现象
TransactionManagementError: Transaction not found for current context

解决方法

  1. 检查事务装饰器的使用范围
  2. 确保异步函数正确使用async/await
  3. 验证数据库连接配置是否正确

4.2 死锁检测

错误日志
Deadlock found when trying to get lock

处理方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from tortoise.exceptions import OperationalError
from fastapi import HTTPException


async def safe_transaction():
try:
async with in_transaction():
# 数据库操作
except OperationalError as e:
if "Deadlock" in str(e):
await asyncio.sleep(0.1) # 随机延迟后重试
return await safe_transaction()
else:
raise HTTPException(status_code=500, detail="数据库错误")

5. 课后Quiz

5.1 事务隔离问题

问题:在可重复读隔离级别下,如何处理余额校验时的并发修改?

答案解析
使用SELECT FOR UPDATE锁定记录:

1
2
3
4
async def update_balance(user_id: int):
async with in_transaction():
user = await User.select_for_update().get(id=user_id)
# 后续操作

5.2 嵌套事务回滚

问题:当外层事务捕获内层事务的异常时,如何保证部分提交?

正确答案
使用保存点机制:

1
2
3
4
5
6
7
8
async def nested_transaction():
async with in_transaction() as conn:
savepoint = await conn.savepoint()
try:
# 内层操作
except Exception:
await savepoint.rollback()
# 外层继续执行

6. 运行环境配置

安装依赖:

1
pip install fastapi tortoise-orm uvicorn pydantic

启动配置:

1
2
3
4
5
6
7
8
9
10
11
12
# main.py
from fastapi import FastAPI

app = FastAPI()

register_tortoise(
app,
db_url='sqlite://db.sqlite3',
modules={'models': ['models']},
generate_schemas=True,
add_exception_handlers=True
)

启动命令:

1
uvicorn main:app --reload

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: