软件教程 2025年08月6日
0 收藏 0 点赞 137 浏览 7460 个字
摘要 :

文章目录 一、两种主流技术方案介绍 (一)Server-Sent Events(SSE)方案 (二)流式HTTP响应(NDJSON)方案 二、关键配置说明 (一)响应头设置 (二)数据格式选……




  • 一、两种主流技术方案介绍
    • (一)Server-Sent Events(SSE)方案
    • (二)流式HTTP响应(NDJSON)方案
  • 二、关键配置说明
    • (一)响应头设置
    • (二)数据格式选择
    • (三)前端处理建议
  • 三、完整工作流程示例(FastAPI + React)
    • (一)后端代码
    • (二)前端React组件代码
  • 四、注意事项
    • (一)连接管理
    • (二)性能优化
    • (三)安全考虑
    • (四)错误处理增强

    最近不少小伙伴都在研究怎么通过API把Deepseek的响应内容以流式的方式输出到前端,今天咱就来详细说说这个事儿。主要以Python后端搭配前端JavaScript为例,给大家讲讲具体的实现方案。

    一、两种主流技术方案介绍

    (一)Server-Sent Events(SSE)方案

    这是一种浏览器原生支持的流式传输方案,用起来比较方便,优先推荐给大家。下面这段是Flask框架下的示例代码:

    # Flask 示例
    from flask import Response, stream_with_context
    
    @app.route(\'/stream\')
    def stream_data():
        # 定义一个生成器函数generate,用于生成流式数据
        def generate():
            # 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
            response = client.chat.completions.create(
                model=\"deepseek-chat\",
                messages=messages,
                stream=True
            )
            
            # 遍历响应的每一个数据块
            for chunk in response:
                # 检查数据块的choices字段是否存在
                if chunk.choices:
                    # 获取当前数据块的内容,若不存在则设为空字符串
                    content = chunk.choices[0].delta.content or \"\"
                    # 按照SSE格式要求,添加data:前缀和双换行符,生成符合规范的流式数据
                    yield f\"data: {json.dumps({\'content\': content})}\\n\\n\"
        
        # 使用Flask的Response函数,将生成器函数的内容作为响应体,设置MIME类型为text/event-stream
        return Response(stream_with_context(generate()), mimetype=\'text/event-stream\')
    

    前端JavaScript代码这样写:

    // 创建一个EventSource对象,连接到后端的/stream接口
    const eventSource = new EventSource(\'/stream\');
    
    // 当接收到后端推送的消息时,执行该回调函数
    eventSource.onmessage = (event) => {
        // 解析接收到的JSON格式数据
        const data = JSON.parse(event.data);
        // 将解析后的数据中的content内容追加到id为output的HTML元素中
        document.getElementById(\'output\').innerHTML += data.content;
    };
    
    // 当EventSource连接出错时,执行该回调函数
    eventSource.onerror = (err) => {
        // 在控制台打印错误信息
        console.error(\'EventSource failed:\', err);
        // 关闭EventSource连接
        eventSource.close();
    };
    

    简单解释一下,SSE就像是后端给前端开了个“小窗口”,后端有新数据就通过这个“窗口”源源不断地推送给前端,前端收到数据后就能马上更新页面显示。

    (二)流式HTTP响应(NDJSON)方案

    这个方案通用性比较强,不仅适用于浏览器客户端,其他类型的客户端也能用。下面是FastAPI框架下的示例代码:

    # FastAPI 示例
    from fastapi import APIRouter
    from fastapi.responses import StreamingResponse
    import json
    
    @app.get(\"/stream\")
    async def stream_data():
        # 定义一个异步生成器函数generate,用于生成流式数据
        async def generate():
            # 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
            response = client.chat.completions.create(
                model=\"deepseek-chat\",
                messages=messages,
                stream=True
            )
            
            # 异步遍历响应的每一个数据块
            async for chunk in response:
                # 检查数据块的choices字段是否存在
                if chunk.choices:
                    # 获取当前数据块的内容,若不存在则设为空字符串
                    content = chunk.choices[0].delta.content or \"\"
                    # 按照NDJSON格式要求,将内容转换为JSON字符串并添加换行符,生成符合规范的流式数据
                    yield json.dumps({\"content\": content}) + \"\\n\" 
        
        # 使用FastAPI的StreamingResponse函数,将生成器函数的内容作为响应体,设置媒体类型为application/x-ndjson
        return StreamingResponse(generate(), media_type=\'application/x-ndjson\')
    

    前端JavaScript可以用Fetch API来接收数据:

    async function streamData() {
        // 使用Fetch API发送GET请求到/stream接口
        const response = await fetch(\'/stream\');
        // 获取响应体的读取器
        const reader = response.body.getReader();
        // 创建一个文本解码器,用于将二进制数据转换为文本
        const decoder = new TextDecoder();
        
        // 循环读取数据
        while(true) {
            // 读取数据块
            const { done, value } = await reader.read();
            // 如果读取完成,跳出循环
            if(done) break;
            
            // 将读取到的二进制数据解码为文本
            const chunk = decoder.decode(value);
            // 解析JSON格式的文本数据
            const data = JSON.parse(chunk);
            // 将解析后的数据中的content内容追加到id为output的HTML元素中
            document.getElementById(\'output\').innerHTML += data.content;
        }
    }
    

    NDJSON方案就好比是把数据打成一个个“包裹”,每个“包裹”都是JSON格式的,然后通过HTTP协议一个一个地发给前端。

    二、关键配置说明

    (一)响应头设置

    Flask框架下需要手动设置响应头:

    # Flask
    headers = {
        \'Cache-Control\': \'no-cache\',
        \'Connection\': \'keep-alive\'
    }
    

    FastAPI框架会自动处理这些配置,不用咱们操心。响应头里的Cache-Control: no-cache是告诉浏览器不要缓存数据,每次都从服务器获取最新的;Connection: keep-alive则是让服务器和客户端之间的连接保持活跃,这样可以持续传输数据。

    (二)数据格式选择

    1. SSE(text/event-stream):浏览器原生支持,而且还有自动重连的功能。要是网络出了点小问题,它能自己尝试重新连接,保证数据接收不中断。
    2. NDJSON(application/x-ndjson):这是一种更通用的流式JSON格式,不管是啥类型的客户端都能接收,兼容性很强。
    3. 纯文本流:这种格式简单倒是简单,就是结构化能力有点弱,不太适合复杂的数据传输。

    (三)前端处理建议

    在前端接收数据的时候,可能会遇到分块不完整的情况。为了更健壮地处理这种问题,可以参考下面这段代码:

    // 定义一个缓冲区,用于存储不完整的数据块
    let buffer = \'\';
    
    async function processChunk(chunk) {
        // 将新接收到的数据块追加到缓冲区
        buffer += chunk;
        // 循环处理缓冲区中的数据,直到没有完整的行
        while(buffer.includes(\'\\n\')) {
            // 找到缓冲区中第一个换行符的位置
            const lineEnd = buffer.indexOf(\'\\n\');
            // 提取缓冲区中第一个完整的行
            const line = buffer.slice(0, lineEnd);
            // 更新缓冲区,去掉已经处理的行
            buffer = buffer.slice(lineEnd + 1);
            
            try {
                // 尝试解析JSON格式的行数据
                const data = JSON.parse(line);
                // 这里可以对解析后的数据进行具体处理
            } catch(e) {
                // 如果解析出错,在控制台打印错误信息
                console.error(\'解析错误:\', e);
            }
        }
    }
    

    这段代码的作用就是把接收到的数据先存到一个“小仓库”(buffer)里,然后每次从“小仓库”里找完整的“包裹”(以换行符分隔的数据行),找到就处理,没找到就接着等新的数据。

    三、完整工作流程示例(FastAPI + React)

    (一)后端代码

    # main.py
    from fastapi import FastAPI
    from fastapi.middleware.cors import CORSMiddleware
    
    app = FastAPI()
    
    # 添加CORS中间件,允许所有来源的请求访问API
    app.add_middleware(
        CORSMiddleware,
        allow_origins=[\"*\"],
        allow_methods=[\"*\"],
        allow_headers=[\"*\"],
    )
    
    @app.get(\"/chat\")
    async def chat_stream(prompt: str):
        # 定义一个异步生成器函数generate,用于生成流式数据
        async def generate():
            # 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
            response = client.chat.completions.create(
                model=\"deepseek-chat\",
                messages=[{\"role\": \"user\", \"content\": prompt}],
                stream=True
            )
            
            # 异步遍历响应的每一个数据块
            async for chunk in response:
                # 检查数据块的content字段是否存在
                if content := chunk.choices[0].delta.content:
                    # 将内容转换为JSON字符串,生成符合规范的流式数据
                    yield json.dumps({\"content\": content})
        
        # 使用FastAPI的StreamingResponse函数,将生成器函数的内容作为响应体,设置媒体类型为application/x-ndjson
        return StreamingResponse(generate(), media_type=\"application/x-ndjson\")
    

    这段代码里,FastAPI创建了一个简单的API服务,CORSMiddleware是用来解决跨域问题的,让前端能顺利访问后端接口。chat_stream函数接收前端传来的prompt参数,调用Deepseek的API获取流式响应,再把响应数据返回给前端。

    (二)前端React组件代码

    // ChatComponent.jsx
    import { useState } from \'react\';
    
    export default function ChatComponent() {
        // 定义一个状态变量output,用于存储聊天响应内容,初始值为空字符串
        const [output, setOutput] = useState(\'\');
        
        const startStream = async () => {
            // 使用Fetch API发送GET请求到http://api/chat接口,并带上prompt参数
            const response = await fetch(\'http://api/chat?prompt=你好\');
            // 获取响应体的读取器
            const reader = response.body.getReader();
            // 创建一个文本解码器,用于将二进制数据转换为文本
            const decoder = new TextDecoder();
            // 定义一个缓冲区,用于存储不完整的数据块
            let buffer = \'\';
            
            // 循环读取数据
            while(true) {
                // 读取数据块
                const { done, value } = await reader.read();
                // 如果读取完成,跳出循环
                if(done) break;
                
                // 将读取到的二进制数据解码为文本,并追加到缓冲区
                buffer += decoder.decode(value);
                // 循环处理缓冲区中的数据,直到没有完整的JSON对象
                while(buffer.includes(\'}\')) {
                    // 找到缓冲区中第一个JSON对象结束的位置
                    const endIndex = buffer.indexOf(\'}\') + 1;
                    // 提取缓冲区中第一个完整的JSON对象
                    const chunk = buffer.slice(0, endIndex);
                    // 更新缓冲区,去掉已经处理的JSON对象
                    buffer = buffer.slice(endIndex);
                    
                    try {
                        // 尝试解析JSON格式的文本数据
                        const data = JSON.parse(chunk);
                        // 更新output状态,将解析后的数据中的content内容追加到原有内容后面
                        setOutput(prev => prev + data.content);
                    } catch(e) {
                        // 如果解析出错,在控制台打印错误信息
                        console.error(\'解析错误:\', e);
                    }
                }
            }
        };
        
        return (
            <div>
                {/* 定义一个按钮,点击时调用startStream函数 */}
                <button onClick={startStream}>开始对话</button>
                {/* 显示聊天响应内容的区域 */}
                <div id=\"output\">{output}</div>
            </div>
        );
    }
    

    在这个React组件里,点击“开始对话”按钮就会触发startStream函数,这个函数从后端获取流式数据,然后在前端解析并显示出来。

    四、注意事项

    (一)连接管理

    1. 设置合理的超时时间:一般设置在30 – 60秒比较合适。要是连接长时间没动静,就自动断开,避免资源浪费。
    2. 处理客户端提前断开连接的情况:在FastAPI里可以这样处理:
    # FastAPI 示例
    try:
        async for chunk in response:
            # 处理数据的代码
            if await request.is_disconnected():
                break
    finally:
        await client.close()  # 清理资源
    

    这段代码在处理数据的过程中,会检查客户端是不是断开连接了,如果断开就停止处理,最后还会清理相关资源。

    (二)性能优化

    1. 使用异步框架:像FastAPI的性能就比Flask要好一些。异步框架能让程序在等待某些操作完成的时候,去做其他事情,提高效率。
    2. 启用响应压缩:可以在FastAPI里通过中间件来实现:
    app = FastAPI()
    @app.middleware(\"http\")
    async def add_compression(request, call_next):
        response = await call_next(request)
        response.headers[\"Content-Encoding\"] = \"gzip\"
        return response
    

    这段代码给响应头添加了Content-Encoding: gzip,这样数据在传输前会被压缩,能减少传输的数据量,提高传输速度。

    (三)安全考虑

    1. 限制最大并发连接数:防止太多客户端同时连接,把服务器挤爆了。
    2. 实施速率限制:比如限制每个客户端每分钟只能请求一定次数,防止恶意请求。在FastAPI里可以借助slowapi库来实现:
    from fastapi import Request
    from fastapi.middleware import Middleware
    from slowapi import Limiter
    from slowapi.util import get_remote_address
    
    limiter = Limiter(key_func=get_remote_address)
    app.state.limiter = limiter
    
    @app.get(\"/chat\")
    @limiter.limit(\"10/minute\")
    async def chat_stream(request: Request):
        # 处理请求的代码
    

    这段代码通过limiter.limit(\"10/minute\")限制了每个客户端每分钟最多只能请求10次。

    (四)错误处理增强

    在生成流式数据的时候,最好加上错误处理,比如这样:

    async def generate():
        try:
            response = client.chat.completions.create(...)
            async for chunk in response:
                # 处理数据的代码
        except Exception as e:
            yield json.dumps({\"error\": str(e)})
        finally:
            await client.close()  # 确保释放资源
    

    这样就算在调用API或者处理数据的过程中出了问题,也能给前端返回错误信息,还能保证资源被正确释放。

    总的来说,这些方案可以根据实际需求灵活组合使用。如果只是面向浏览器端的应用,优先选SSE方案;要是需要支持更复杂的场景,WebSocket也是个不错的选择,不过实现起来稍微麻烦点。希望这篇文章能帮到正在研究这个问题的小伙伴们!

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/6834.html

管理员

相关推荐
2025-08-06

文章目录 一、Promise基础回顾 二、Promise 与 axios 结合使用场景及方法 (一)直接返回 axios …

270
2025-08-06

文章目录 一、模块初始化时的内部机制 二、常见导出写法的差异分析 (一)写法一:module.exports…

108
2025-08-06

文章目录 一、ResizeObserver详解 (一)ResizeObserver是什么 (二)ResizeObserver的基本用法 …

684
2025-08-06

文章目录 一、前期准备工作 (一)下载相关文件 (二)安装必要工具 二、处理扣子空间生成的文件…

340
2025-08-06

文章目录 一、官方文档 二、自动解包的数据类型 ref对象:无需.value即可访问 reactive对象:保持…

371
2025-08-06

文章目录 一、Hooks的工作原理 二、在if语句中使用Hook会出什么岔子? 三、React官方的Hook使用规…

844
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号