流式传输让应用能够在 AI 生成内容时实时接收 token,而不是等整段响应完成后再返回。这能显著降低首 token 时间,并让用户感受到更流畅的交互体验。
启用流式传输
在请求体中设置 "stream": true,即可接收 Server-Sent Events (SSE):
Python(OpenAI SDK)
PYTHON
1from openai import OpenAI2 3client = OpenAI(4 base_url="https://api.modelriver.com/v1",5 api_key="mr_live_YOUR_API_KEY"6)7 8stream = client.chat.completions.create(9 model="my_workflow",10 messages=[{"role": "user", "content": "Tell me a story"}],11 stream=True12)13 14for chunk in stream:15 if chunk.choices[0].delta.content:16 print(chunk.choices[0].delta.content, end="")Node.js(OpenAI SDK)
JAVASCRIPT
1import OpenAI from "openai";2 3const client = new OpenAI({4 baseURL: "https://api.modelriver.com/v1",5 apiKey: "mr_live_YOUR_API_KEY",6});7 8const stream = await client.chat.completions.create({9 model: "my_workflow",10 messages: [{ role: "user", content: "Tell me a story" }],11 stream: true,12});13 14for await (const chunk of stream) {15 const content = chunk.choices[0]?.delta?.content;16 if (content) process.stdout.write(content);17}原生 API(cURL)
Bash
curl -X POST https://api.modelriver.com/v1/ai \ -H "Authorization: Bearer mr_live_your_key" \ -H "Content-Type: application/json" \ -N \ -d '{ "workflow": "my-workflow", "messages": [{"role": "user", "content": "Tell me a story"}], "stream": true }'SSE 事件格式
每个 Server-Sent Event 都遵循 OpenAI delta 格式:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]} data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]} data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]} data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} data: [DONE]Chunk 结构
| 字段 | 说明 |
|---|---|
id | 同一条流内所有 chunk 共用同一个 ID |
object | 恒为 "chat.completion.chunk" |
choices[0].delta.role | 首个 chunk 中通常为 "assistant" |
choices[0].delta.content | 增量文本内容 |
choices[0].finish_reason | 流式过程中为 null,最终内容块为 "stop" |
[DONE] | 表示流结束的哨兵值 |
流式能力说明
心跳
ModelRiver 每 15 秒 会发送一次心跳注释(: heartbeat),以保持连接活跃,防止代理和负载均衡器关闭空闲连接。
超时
流式请求最长支持 5 分钟。如果在此时间窗口内供应商未开始生成内容,流会以错误事件结束。
流结束
流会以 data: [DONE] 收尾。你应始终检查这个哨兵值,以正确关闭连接并执行清理逻辑。
流中的错误处理
如果流式过程中发生错误,ModelRiver 会在关闭连接前发送一个错误事件:
data: {"error":{"message":"Provider timeout","type":"upstream_error","code":"timeout"}} data: [DONE]处理流错误
PYTHON
1try:2 stream = client.chat.completions.create(3 model="my_workflow",4 messages=[{"role": "user", "content": "Hello"}],5 stream=True6 )7 8 for chunk in stream:9 if chunk.choices[0].delta.content:10 print(chunk.choices[0].delta.content, end="")11except Exception as e:12 print(f"Stream error: {e}")13 # Implement retry logic or fallbackJAVASCRIPT
1try {2 const stream = await client.chat.completions.create({3 model: "my_workflow",4 messages: [{ role: "user", content: "Hello" }],5 stream: true,6 });7 8 for await (const chunk of stream) {9 const content = chunk.choices[0]?.delta?.content;10 if (content) process.stdout.write(content);11 }12} catch (error) {13 console.error("Stream error:", error.message);14 // Implement retry logic or fallback15}