使用 SSE 实现实时流式传输

按 token 逐步返回 AI 响应,显著改善体感性能。内置心跳、超时处理,以及 Python 与 Node.js 示例。

流式传输让应用能够在 AI 生成内容时实时接收 token,而不是等整段响应完成后再返回。这能显著降低首 token 时间,并让用户感受到更流畅的交互体验。

启用流式传输

在请求体中设置 "stream": true,即可接收 Server-Sent Events (SSE):

Python(OpenAI SDK)

PYTHON
1from openai import OpenAI
2 
3client = OpenAI(
4 base_url="https://api.modelriver.com/v1",
5 api_key="mr_live_YOUR_API_KEY"
6)
7 
8stream = client.chat.completions.create(
9 model="my_workflow",
10 messages=[{"role": "user", "content": "Tell me a story"}],
11 stream=True
12)
13 
14for chunk in stream:
15 if chunk.choices[0].delta.content:
16 print(chunk.choices[0].delta.content, end="")

Node.js(OpenAI SDK)

JAVASCRIPT
1import OpenAI from "openai";
2 
3const client = new OpenAI({
4 baseURL: "https://api.modelriver.com/v1",
5 apiKey: "mr_live_YOUR_API_KEY",
6});
7 
8const stream = await client.chat.completions.create({
9 model: "my_workflow",
10 messages: [{ role: "user", content: "Tell me a story" }],
11 stream: true,
12});
13 
14for await (const chunk of stream) {
15 const content = chunk.choices[0]?.delta?.content;
16 if (content) process.stdout.write(content);
17}

原生 API(cURL)

Bash
curl -X POST https://api.modelriver.com/v1/ai \
-H "Authorization: Bearer mr_live_your_key" \
-H "Content-Type: application/json" \
-N \
-d '{
"workflow": "my-workflow",
"messages": [{"role": "user", "content": "Tell me a story"}],
"stream": true
}'

SSE 事件格式

每个 Server-Sent Event 都遵循 OpenAI delta 格式:

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
 
data: [DONE]

Chunk 结构

字段说明
id同一条流内所有 chunk 共用同一个 ID
object恒为 "chat.completion.chunk"
choices[0].delta.role首个 chunk 中通常为 "assistant"
choices[0].delta.content增量文本内容
choices[0].finish_reason流式过程中为 null,最终内容块为 "stop"
[DONE]表示流结束的哨兵值

流式能力说明

心跳

ModelRiver 每 15 秒 会发送一次心跳注释(: heartbeat),以保持连接活跃,防止代理和负载均衡器关闭空闲连接。

超时

流式请求最长支持 5 分钟。如果在此时间窗口内供应商未开始生成内容,流会以错误事件结束。

流结束

流会以 data: [DONE] 收尾。你应始终检查这个哨兵值,以正确关闭连接并执行清理逻辑。


流中的错误处理

如果流式过程中发生错误,ModelRiver 会在关闭连接前发送一个错误事件:

data: {"error":{"message":"Provider timeout","type":"upstream_error","code":"timeout"}}
 
data: [DONE]

处理流错误

PYTHON
1try:
2 stream = client.chat.completions.create(
3 model="my_workflow",
4 messages=[{"role": "user", "content": "Hello"}],
5 stream=True
6 )
7 
8 for chunk in stream:
9 if chunk.choices[0].delta.content:
10 print(chunk.choices[0].delta.content, end="")
11except Exception as e:
12 print(f"Stream error: {e}")
13 # Implement retry logic or fallback
JAVASCRIPT
1try {
2 const stream = await client.chat.completions.create({
3 model: "my_workflow",
4 messages: [{ role: "user", content: "Hello" }],
5 stream: true,
6 });
7 
8 for await (const chunk of stream) {
9 const content = chunk.choices[0]?.delta?.content;
10 if (content) process.stdout.write(content);
11 }
12} catch (error) {
13 console.error("Stream error:", error.message);
14 // Implement retry logic or fallback
15}

下一步