FastAPI + ModelRiver

具有异步 Python 特性的高性能 AI API。REST 端点、WebSocket 流式传输和后台任务处理:所有这些都通过 ModelRiver 进行路由。

概览

FastAPI 是基于 async/await 构建的、增长最快的 Python API 框架。结合 ModelRiver,您可以用最少的代码构建强大的 AI REST API、WebSocket 流式传输端点以及后台处理流水线。


快速上手

安装相关依赖

Bash
pip install fastapi uvicorn openai python-dotenv

基础 AI 端点

PYTHON
1# main.py
2from fastapi import FastAPI
3from openai import AsyncOpenAI
4from pydantic import BaseModel
5import os
6 
7app = FastAPI()
8 
9client = AsyncOpenAI(
10 base_url="https://api.modelriver.com/v1",
11 api_key=os.environ["MODELRIVER_API_KEY"],
12)
13 
14class ChatRequest(BaseModel):
15 message: str
16 workflow: str = "my-chat-workflow"
17 
18class ChatResponse(BaseModel):
19 content: str
20 model: str
21 tokens: int
22 
23@app.post("/chat", response_model=ChatResponse)
24async def chat(req: ChatRequest):
25 response = await client.chat.completions.create(
26 model=req.workflow,
27 messages=[{"role": "user", "content": req.message}],
28 )
29 
30 return ChatResponse(
31 content=response.choices[0].message.content,
32 model=response.model,
33 tokens=response.usage.total_tokens,
34 )

运行服务器:

Bash
uvicorn main:app --reload

SSE 流式传输 (Streaming with SSE)

PYTHON
1from fastapi import FastAPI
2from fastapi.responses import StreamingResponse
3from openai import AsyncOpenAI
4import os
5 
6app = FastAPI()
7client = AsyncOpenAI(
8 base_url="https://api.modelriver.com/v1",
9 api_key=os.environ["MODELRIVER_API_KEY"],
10)
11 
12@app.post("/chat/stream")
13async def chat_stream(req: ChatRequest):
14 async def generate():
15 stream = await client.chat.completions.create(
16 model=req.workflow,
17 messages=[{"role": "user", "content": req.message}],
18 stream=True,
19 )
20 async for chunk in stream:
21 content = chunk.choices[0].delta.content
22 if content:
23 yield f"data: {content}\n\n"
24 yield "data: [DONE]\n\n"
25 
26 return StreamingResponse(generate(), media_type="text/event-stream")

WebSocket 聊天

PYTHON
1from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2from openai import AsyncOpenAI
3import json, os
4 
5app = FastAPI()
6client = AsyncOpenAI(
7 base_url="https://api.modelriver.com/v1",
8 api_key=os.environ["MODELRIVER_API_KEY"],
9)
10 
11@app.websocket("/ws/chat")
12async def websocket_chat(ws: WebSocket):
13 await ws.accept()
14 messages = []
15 
16 try:
17 while True:
18 data = await ws.receive_text()
19 user_msg = json.loads(data)
20 messages.append({"role": "user", "content": user_msg["content"]})
21 
22 stream = await client.chat.completions.create(
23 model="my-chat-workflow",
24 messages=messages,
25 stream=True,
26 )
27 
28 full_response = ""
29 async for chunk in stream:
30 content = chunk.choices[0].delta.content
31 if content:
32 full_response += content
33 await ws.send_text(json.dumps({"type": "chunk", "content": content}))
34 
35 messages.append({"role": "assistant", "content": full_response})
36 await ws.send_text(json.dumps({"type": "done"}))
37 
38 except WebSocketDisconnect:
39 pass

后台任务 (Background tasks)

PYTHON
1from fastapi import FastAPI, BackgroundTasks
2from openai import AsyncOpenAI
3import os
4 
5app = FastAPI()
6client = AsyncOpenAI(
7 base_url="https://api.modelriver.com/v1",
8 api_key=os.environ["MODELRIVER_API_KEY"],
9)
10 
11async def process_document(doc_id: str, content: str):
12 """后台任务:总结文档。"""
13 response = await client.chat.completions.create(
14 model="my-summary-workflow",
15 messages=[
16 {"role": "system", "content": "请对以下文档进行总结。"},
17 {"role": "user", "content": content},
18 ],
19 )
20 summary = response.choices[0].message.content
21 # 将总结保存至数据库...
22 print(f"文档 {doc_id} 的总结已完成:{summary[:100]}")
23 
24@app.post("/documents/{doc_id}/summarise")
25async def summarise_document(doc_id: str, background_tasks: BackgroundTasks):
26 content = "..." # 从数据库获取
27 background_tasks.add_task(process_document, doc_id, content)
28 return {"status": "processing", "doc_id": doc_id}

依赖注入 (Dependency injection)

PYTHON
1from fastapi import Depends
2from openai import AsyncOpenAI
3import os
4 
5def get_ai_client() -> AsyncOpenAI:
6 return AsyncOpenAI(
7 base_url="https://api.modelriver.com/v1",
8 api_key=os.environ["MODELRIVER_API_KEY"],
9 )
10 
11@app.post("/chat")
12async def chat(req: ChatRequest, ai: AsyncOpenAI = Depends(get_ai_client)):
13 response = await ai.chat.completions.create(
14 model=req.workflow,
15 messages=[{"role": "user", "content": req.message}],
16 )
17 return {"content": response.choices[0].message.content}

最佳实践 (Best practices)

  1. 使用 AsyncOpenAI: FastAPI 原生支持异步;请务必使用异步客户端。
  2. 添加错误处理: 捕获 openai.APIError 并向用户返回有意义的 HTTP 错误响应。
  3. 使用依赖注入: 配合模拟 (mock) 客户端,这会让测试变得更容易。
  4. 长响应时使用流式传输: 面向用户的端点请使用 SSE 或 WebSocket 来提供流式体验。
  5. 对批处理使用后台任务: 不要让繁重的 AI 调用阻塞了请求处理器。

下一步探究