异步日志监控:FastAPI与MongoDB的高效整合之道

avatar
cmdragon 大乘
image image

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

探索数千个预构建的 AI 应用,开启你的下一个伟大创意https://tools.cmdragon.cn/

FastAPI与MongoDB日志监控系统整合实战

1. 环境准备与依赖安装

# 安装核心库
pip install fastapi==0.103.1 
pip install motor==3.3.2
pip install pydantic==1.10.7
pip install uvicorn==0.23.2

2. MongoDB异步连接配置

from fastapi import FastAPI, Depends
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from datetime import datetime

app = FastAPI()


# MongoDB连接配置
async def get_db():
    client = AsyncIOMotorClient(
        "mongodb://admin:password@localhost:27017",
        maxPoolSize=10,
        minPoolSize=5
    )
    return client.log_db


# 日志数据模型
class LogEntry(BaseModel):
    timestamp: datetime
    level: str  # DEBUG/INFO/WARNING/ERROR
    service: str
    message: str
    metadata: dict = None

3. 核心功能实现

3.1 日志写入接口

@app.post("/logs")
async def create_log(log: LogEntry, db=Depends(get_db)):
    """异步写入日志到MongoDB"""
    log_dict = log.dict()
    result = await db.logs.insert_one(log_dict)
    return {"inserted_id": str(result.inserted_id)}

3.2 聚合管道查询示例

@app.get("/logs/stats")
async def get_log_stats(service: str, db=Depends(get_db)):
    """按服务统计错误日志数量"""
    pipeline = [
        {"$match": {
            "service": service,
            "level": "ERROR",
            "timestamp": {"$gte": datetime(2023, 1, 1)}
        }},
        {"$group": {
            "_id": "$service",
            "error_count": {"$sum": 1},
            "latest_error": {"$last": "$timestamp"}
        }}
    ]
    cursor = db.logs.aggregate(pipeline)
    results = await cursor.to_list(length=100)
    return results

3.3 索引优化实战

# 启动时创建索引
@app.on_event("startup")
async def create_indexes():
    db = await get_db()
    await db.logs.create_index([("timestamp", 1)], name="timestamp_asc")
    await db.logs.create_index(
        [("service", 1), ("level", 1)],
        name="service_level_compound"
    )

4. 性能优化技巧

4.1 批量写入优化

@app.post("/logs/bulk")
async def bulk_insert(logs: list[LogEntry], db=Depends(get_db)):
    """批量插入日志提升写入性能"""
    documents = [log.dict() for log in logs]
    result = await db.logs.insert_many(documents)
    return {"inserted_count": len(result.inserted_ids)}

4.2 查询分页实现

@app.get("/logs")
async def query_logs(
        page: int = 1,
        page_size: int = 50,
        db=Depends(get_db)
):
    """带分页的日志查询接口"""
    skip = (page - 1) * page_size
    cursor = db.logs.find().sort("timestamp", -1).skip(skip).limit(page_size)
    return await cursor.to_list(length=page_size)

5. 常见报错解决方案

5.1 422 Validation Error

现象:请求体字段类型不匹配
解决方案

  1. 检查pydantic模型字段类型定义
  2. 使用try/except块捕获ValidationError
from fastapi.exceptions import RequestValidationError


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
    return JSONResponse(
        status_code=400,
        content={"detail": "请求数据格式错误"}
    )

5.2 MongoClient连接超时

现象ServerSelectionTimeoutError
排查步骤

  1. 检查MongoDB服务状态 systemctl status mongod
  2. 验证连接字符串格式是否正确
  3. 检查防火墙设置是否开放27017端口

6. 课后Quiz

问题1:当使用$match进行时间范围查询时,如何确保查询性能?
A) 使用内存缓存
B) 在timestamp字段创建索引
C) 增加数据库连接池

正确答案:B
解析:创建索引可以显著提升字段的查询效率,特别是对时间戳这种常用于范围查询的字段

问题2:在批量插入日志时,如何保证数据完整性?
A) 使用事务操作
B) 启用写入确认机制
C) 增加重试逻辑

正确答案:B
解析:MongoDB的写入确认(write concern)机制可以确保数据成功写入磁盘

7. 生产环境建议

  1. 连接池配置:根据业务负载调整maxPoolSize(建议10-100之间)
  2. 读写分离:为分析类查询配置secondary节点读取
  3. 慢查询监控:定期检查db.currentOp()的输出
  4. TTL索引:自动清理过期日志
# 创建7天过期的TTL索引
await db.logs.create_index(
    [("timestamp", 1)],
    name="logs_ttl",
    expireAfterSeconds=604800  # 7天
)

通过本文的完整实现方案,开发者可以快速构建日均千万级日志处理系统。实际部署时建议配合Prometheus进行性能监控,并使用Grafana实现可视化看板。

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:

往期文章归档: