首页 开发教程 Java + Spring 到 Python + FastAPI (二)

Java + Spring 到 Python + FastAPI (二)

开发教程 2025年12月4日
271 浏览

FastAPI 中 Depends 应用级使用

除了单个函数方法的 Depends 外,还有整个应用层级的 Depends yield,以 Redis 初始化连接池为例。

连接池只在应用的启动的时候创建,停止时关闭。可以用 @app.on_event(\”startup\”) 和 @app.on_event(\”shutdown\”),但更推荐用 lifespan。

老用法:

# 逻辑被拆分在两个不同的函数里
@app.on_event(\"startup\")
async def startup_event():
    print(\"--- 启动:创建连接池 ---\")
    app.state.db_pool = await create_db_pool()

@app.on_event(\"shutdown\")
async def shutdown_event():
    print(\"--- 关闭:销毁连接池 ---\")
    await app.state.db_pool.close()

app = FastAPI()

新用法:

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # --- 启动时 ---
    print(\"--- (Lifespan) 创建数据库连接池 (Engine)... ---\")
    db_pool_engine = await create_db_pool() # 昂贵的操作,只做一次
    
    # 关键:把 \'池\' 挂载到 app.state 上
    app.state.db_pool = db_pool_engine
    
    yield
    
    # --- 关闭时 ---
    print(\"--- (Lifespan) 关闭数据库连接池... ---\")
    await app.state.db_pool.close()

# 注册 lifespan
app = FastAPI(lifespan=lifespan)

app.state 是 FastAPI 提供的一个全局状态对象,用于存储 Redis、MySQL 连接池等比较重的对象。

from fastapi import Request, Depends

# 这是你的 \'Depends\' 依赖项
async def get_db_session(request: Request):
    # 1. 从 \'request\' 中拿到 \'app\',再拿到 \'state\'
    db_pool = request.app.state.db_pool
    
    # 2. 从 \'池\' 中获取一个 \'连接\'
    #    \'async with\' 语句会自动 \'借\' 和 \'还\'
    async with db_pool.acquire() as connection:
        # 3. 基于这个连接,创建事务
        async with connection.begin() as transaction:
            # 4. 创建一个轻量级的 Session
            #    (注意:不同DB库实现不同,这里是概念)
            session = create_session(connection, transaction) 
            
            try:
                # 5. \'yield\' 这个 session 给路由
                yield session
                # 6. 路由成功,事务自动 commit
            except Exception:
                # 7. 路由失败,事务自动 rollback
                await transaction.rollback()
                raise
@app.get(\"/users/{user_id}\")
async def get_user(
    user_id: int,
    # \'Depends\' 会自动执行第 2 步
    db: AsyncSession = Depends(get_db_session) 
):
    # \'db\' 就是那个被 \'yield\' 出来的、带事务的 session
    user = await user_repo.get_user(db, user_id)
    return user

总结: lifespan (启动) -> app.state (全局存储) -> Request (传递 app) -> Depends (读取 app.state) -> yield (注入路由)。

资源连接池全局使用

因为是 FastAPI 的,意味着必须在 http 环境内才能使用。如果在非 http 环境使用,需要这样改造。

先创建一个全局共用的 db_core.py。

# db_core.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine
from sqlalchemy.orm import sessionmaker
from contextlib import asynccontextmanager

# 1. 在模块的顶层定义一个全局变量 (我们的“单例”)
#    它在所有导入此模块的文件中是 *共享* 的
db_engine: AsyncEngine | None = None
_async_session_maker = None

async def create_db_pool():
    global db_engine, _async_session_maker
    
    print(\"--- (Core) 正在创建全局数据库连接池... ---\")
    db_engine = create_async_engine(
        \"mysql+aiomysql://...\", 
        pool_size=10, 
        max_overflow=5
    )
    _async_session_maker = sessionmaker(
        db_engine, class_=AsyncSession, expire_on_commit=False
    )

async def close_db_pool():
    global db_engine
    if db_engine:
        print(\"--- (Core) 正在关闭全局数据库连接池... ---\")
        await db_engine.dispose()

# 2. 这就是你的“事务”依赖项!
#    注意:它现在 *不* 需要 \'request: Request\' 了!
@asynccontextmanager
async def get_db_session():
    if not _async_session_maker:
        raise Exception(\"数据库连接池未初始化,请先调用 create_db_pool()\")

    # _async_session_maker() 会自动从 \'db_engine\' (池) 中获取连接
    async with _async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

在 FastAPI 中使用 lifespan 包裹。

# main.py
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from db_core import create_db_pool, close_db_pool, get_db_session

@asynccontextmanager
async def lifespan(app: FastAPI):
    # \'lifespan\' 负责触发全局单例的初始化
    await create_db_pool()
    yield
    await close_db_pool()

app = FastAPI(lifespan=lifespan)

@app.get(\"/users/{user_id}\")
async def get_user(
    user_id: int,
    # 魔法!\'get_db_session\' 现在是一个“通用”依赖项
    # FastAPI 会自动处理 @asynccontextmanager
    db: AsyncSession = Depends(get_DBSession) 
):
    # \'db\' 就是那个带事务的 session
    user = await user_repo.get_user(db, user_id)
    return user

在非 http 环境中,则自己维护生命周期。比如 Kafka 消费。

# main.py
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from db_core import create_db_pool, close_db_pool, get_db_session

@asynccontextmanager
async def lifespan(app: FastAPI):
    # \'lifespan\' 负责触发全局单例的初始化
    await create_db_pool()
    yield
    await close_db_pool()

app = FastAPI(lifespan=lifespan)

@app.get(\"/users/{user_id}\")
async def get_user(
    user_id: int,
    # 魔法!\'get_db_session\' 现在是一个“通用”依赖项
    # FastAPI 会自动处理 @asynccontextmanager
    db: AsyncSession = Depends(get_DBSession) 
):
    # \'db\' 就是那个带事务的 session
    user = await user_repo.get_user(db, user_id)
    return user

用哪种取决于项目,纯 http API 项目全在 FastAPI 里面配合 BackgroundTasks 就够了,如果还有非 http 入口的处理,比如定时任务、Kafka 消费等,则用第二种。

BackgroundTasks 的作用和异步线程类似,比如用户注册成功后要发个邮件通知,这就可以用 BackgroundTasks 做,只不过原理不是真的有个异步线程池,而是 FastAPI 内部维护了一个 list 列表保存这些任务,在 http 处理完毕后,将 list 给 Ready Queue 执行,例如。

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

# 这是一个模拟的、\'async\' 的邮件发送函数
async def send_welcome_email(email: str, name: str):
    # 模拟 2 秒钟的 I/O 延迟
    await asyncio.sleep(2)
    print(f\"--- [后台任务]:邮件已发送给 {email} (姓名: {name}) ---\")
    
@app.post(\"/users\")
async def create_user(
    email: str,
    name: str,
    background_tasks: BackgroundTasks  # 1. 在这里声明
):
    # ... 你的核心业务逻辑,比如保存用户到数据库 ...
    print(f\"--- (路由) 正在创建用户 {name} ... ---\")
    
    # 2. 添加后台任务
    # 它不会立即执行!
    background_tasks.add_task(
        send_welcome_email,  # 你要调用的函数
        email,               # 位置参数
        name=name            # 关键字参数
    )
    
    # 3. 立即返回响应
    print(f\"--- (路由) 立即返回响应给用户 ---\")
    return {\"message\": f\"用户 {name} 创建成功,欢迎邮件将在后台发送。\"}

遇到 CPU 密集型任务怎么办?

在上一篇文章里面详细说过,为了方便做内存回收,Python 用 GLC 保证在同一个时刻只有一个线程拿到锁,io 任务除外。

所以 Python 没有多线程,即使用 asyncio.to_thread 执行,也会占用 GLC。

要真正的运行 CPU 密集型任务,只能用多进程。每个进程有自己的 GLC,相互不干扰,即新启一个 Python 程序。

import asyncio
from concurrent.futures import ProcessPoolExecutor

def my_cpu_heavy_task():
    # ... (5 秒钟的 CPU 密集型计算) ...
    return \"Done\"

# 你需要自己管理这个进程池
cpu_pool = ProcessPoolExecutor()

@app.get(\"/cpu-task\")
async def run_cpu_task():
    loop = asyncio.get_running_loop()
    
    # 告诉事件循环:
    # \"请在 \'cpu_pool\' (一个完全独立的进程) 里运行这个任务\"
    result = await loop.run_in_executor(
        cpu_pool,  # <--- 使用进程池,而不是线程池
        my_cpu_heavy_task
    )
    
    return {\"result\": result}

yield 的原理

yield 是暂停一个函数,return 是返回一个函数。yield 把一个普通函数变为了生成器(Generator)。

def my_generator():
    print(\"--- 1. 生成器开始 ---\")
    yield \"Hello\"
    print(\"--- 2. 我从 \'Hello\' 之后被唤醒了 ---\")
    yield \"World\"
    print(\"--- 3. 我从 \'World\' 之后被唤醒了 ---\")
    # 没有更多的 yield,函数自动结束

类似 Java 中 Iterator,每一个 yield 都是 next(),先判断 hasNext(),再执行。

在堆栈层面,普通的 return 执行后,堆栈里面的临时变量都销毁,而 yield 相当于暂停操作,保留内存中的数据,生命周期是创建 -> 暂停 -> 恢复 -> … -> 死亡。

关键词和组件区分

Python 的语言关键词有:async def / await、yield、with/async with,语言特性有 @…(只是提供@符号),Type Hints、装饰器(将函数变为方法入参,类似 Spring 的环绕通知),标准库有 asyncio、contextlib、typing。

FastAPI 的有 Depends(和 yield 搭配一起用),@app.get、 @app.post、@app.on_event(\”startup\”) 等框架装饰器,lifespan http 应用的生命周期,Pydantic(用 Type Hints 做校验和类型转换),Uvicorn(类似 Tomcat 容器)。

发表评论
暂无评论

还没有评论呢,快来抢沙发~

客服

点击联系客服 点击联系客服

在线时间:09:00-18:00

关注微信公众号

关注微信公众号
客服电话

400-888-8888

客服邮箱 122325244@qq.com

手机

扫描二维码

手机访问本站

扫描二维码
搜索