概览
FastAPI 是基于 async/await 构建的、增长最快的 Python API 框架。结合 ModelRiver,您可以用最少的代码构建强大的 AI REST API、WebSocket 流式传输端点以及后台处理流水线。
快速上手
安装相关依赖
Bash
pip install fastapi uvicorn openai python-dotenv基础 AI 端点
PYTHON
1# main.py2from fastapi import FastAPI3from openai import AsyncOpenAI4from pydantic import BaseModel5import os6 7app = FastAPI()8 9client = AsyncOpenAI(10 base_url="https://api.modelriver.com/v1",11 api_key=os.environ["MODELRIVER_API_KEY"],12)13 14class ChatRequest(BaseModel):15 message: str16 workflow: str = "my-chat-workflow"17 18class ChatResponse(BaseModel):19 content: str20 model: str21 tokens: int22 23@app.post("/chat", response_model=ChatResponse)24async def chat(req: ChatRequest):25 response = await client.chat.completions.create(26 model=req.workflow,27 messages=[{"role": "user", "content": req.message}],28 )29 30 return ChatResponse(31 content=response.choices[0].message.content,32 model=response.model,33 tokens=response.usage.total_tokens,34 )运行服务器:
Bash
uvicorn main:app --reloadSSE 流式传输 (Streaming with SSE)
PYTHON
1from fastapi import FastAPI2from fastapi.responses import StreamingResponse3from openai import AsyncOpenAI4import os5 6app = FastAPI()7client = AsyncOpenAI(8 base_url="https://api.modelriver.com/v1",9 api_key=os.environ["MODELRIVER_API_KEY"],10)11 12@app.post("/chat/stream")13async def chat_stream(req: ChatRequest):14 async def generate():15 stream = await client.chat.completions.create(16 model=req.workflow,17 messages=[{"role": "user", "content": req.message}],18 stream=True,19 )20 async for chunk in stream:21 content = chunk.choices[0].delta.content22 if content:23 yield f"data: {content}\n\n"24 yield "data: [DONE]\n\n"25 26 return StreamingResponse(generate(), media_type="text/event-stream")WebSocket 聊天
PYTHON
1from fastapi import FastAPI, WebSocket, WebSocketDisconnect2from openai import AsyncOpenAI3import json, os4 5app = FastAPI()6client = AsyncOpenAI(7 base_url="https://api.modelriver.com/v1",8 api_key=os.environ["MODELRIVER_API_KEY"],9)10 11@app.websocket("/ws/chat")12async def websocket_chat(ws: WebSocket):13 await ws.accept()14 messages = []15 16 try:17 while True:18 data = await ws.receive_text()19 user_msg = json.loads(data)20 messages.append({"role": "user", "content": user_msg["content"]})21 22 stream = await client.chat.completions.create(23 model="my-chat-workflow",24 messages=messages,25 stream=True,26 )27 28 full_response = ""29 async for chunk in stream:30 content = chunk.choices[0].delta.content31 if content:32 full_response += content33 await ws.send_text(json.dumps({"type": "chunk", "content": content}))34 35 messages.append({"role": "assistant", "content": full_response})36 await ws.send_text(json.dumps({"type": "done"}))37 38 except WebSocketDisconnect:39 pass后台任务 (Background tasks)
PYTHON
1from fastapi import FastAPI, BackgroundTasks2from openai import AsyncOpenAI3import os4 5app = FastAPI()6client = AsyncOpenAI(7 base_url="https://api.modelriver.com/v1",8 api_key=os.environ["MODELRIVER_API_KEY"],9)10 11async def process_document(doc_id: str, content: str):12 """后台任务:总结文档。"""13 response = await client.chat.completions.create(14 model="my-summary-workflow",15 messages=[16 {"role": "system", "content": "请对以下文档进行总结。"},17 {"role": "user", "content": content},18 ],19 )20 summary = response.choices[0].message.content21 # 将总结保存至数据库...22 print(f"文档 {doc_id} 的总结已完成:{summary[:100]}")23 24@app.post("/documents/{doc_id}/summarise")25async def summarise_document(doc_id: str, background_tasks: BackgroundTasks):26 content = "..." # 从数据库获取27 background_tasks.add_task(process_document, doc_id, content)28 return {"status": "processing", "doc_id": doc_id}依赖注入 (Dependency injection)
PYTHON
1from fastapi import Depends2from openai import AsyncOpenAI3import os4 5def get_ai_client() -> AsyncOpenAI:6 return AsyncOpenAI(7 base_url="https://api.modelriver.com/v1",8 api_key=os.environ["MODELRIVER_API_KEY"],9 )10 11@app.post("/chat")12async def chat(req: ChatRequest, ai: AsyncOpenAI = Depends(get_ai_client)):13 response = await ai.chat.completions.create(14 model=req.workflow,15 messages=[{"role": "user", "content": req.message}],16 )17 return {"content": response.choices[0].message.content}最佳实践 (Best practices)
- 使用
AsyncOpenAI: FastAPI 原生支持异步;请务必使用异步客户端。 - 添加错误处理: 捕获
openai.APIError并向用户返回有意义的 HTTP 错误响应。 - 使用依赖注入: 配合模拟 (mock) 客户端,这会让测试变得更容易。
- 长响应时使用流式传输: 面向用户的端点请使用 SSE 或 WebSocket 来提供流式体验。
- 对批处理使用后台任务: 不要让繁重的 AI 调用阻塞了请求处理器。