Overview
FastAPI is Python's fastest-growing API framework, built on async/await. Combined with ModelRiver, you can build AI-powered REST APIs, WebSocket streaming endpoints, and background processing pipelines with minimal code.
Quick start
Install dependencies
Bash
pip install fastapi uvicorn openai python-dotenvBasic AI endpoint
PYTHON
1# main.py2from fastapi import FastAPI3from openai import AsyncOpenAI4from pydantic import BaseModel5import os6 7app = FastAPI()8 9client = AsyncOpenAI(10 base_url="https://api.modelriver.com/v1",11 api_key=os.environ["MODELRIVER_API_KEY"],12)13 14class ChatRequest(BaseModel):15 message: str16 workflow: str = "my-chat-workflow"17 18class ChatResponse(BaseModel):19 content: str20 model: str21 tokens: int22 23@app.post("/chat", response_model=ChatResponse)24async def chat(req: ChatRequest):25 response = await client.chat.completions.create(26 model=req.workflow,27 messages=[{"role": "user", "content": req.message}],28 )29 30 return ChatResponse(31 content=response.choices[0].message.content,32 model=response.model,33 tokens=response.usage.total_tokens,34 )Bash
uvicorn main:app --reloadStreaming with SSE
PYTHON
1from fastapi import FastAPI2from fastapi.responses import StreamingResponse3from openai import AsyncOpenAI4import os5 6app = FastAPI()7client = AsyncOpenAI(8 base_url="https://api.modelriver.com/v1",9 api_key=os.environ["MODELRIVER_API_KEY"],10)11 12@app.post("/chat/stream")13async def chat_stream(req: ChatRequest):14 async def generate():15 stream = await client.chat.completions.create(16 model=req.workflow,17 messages=[{"role": "user", "content": req.message}],18 stream=True,19 )20 async for chunk in stream:21 content = chunk.choices[0].delta.content22 if content:23 yield f"data: {content}\n\n"24 yield "data: [DONE]\n\n"25 26 return StreamingResponse(generate(), media_type="text/event-stream")WebSocket chat
PYTHON
1from fastapi import FastAPI, WebSocket, WebSocketDisconnect2from openai import AsyncOpenAI3import json, os4 5app = FastAPI()6client = AsyncOpenAI(7 base_url="https://api.modelriver.com/v1",8 api_key=os.environ["MODELRIVER_API_KEY"],9)10 11@app.websocket("/ws/chat")12async def websocket_chat(ws: WebSocket):13 await ws.accept()14 messages = []15 16 try:17 while True:18 data = await ws.receive_text()19 user_msg = json.loads(data)20 messages.append({"role": "user", "content": user_msg["content"]})21 22 stream = await client.chat.completions.create(23 model="my-chat-workflow",24 messages=messages,25 stream=True,26 )27 28 full_response = ""29 async for chunk in stream:30 content = chunk.choices[0].delta.content31 if content:32 full_response += content33 await ws.send_text(json.dumps({"type": "chunk", "content": content}))34 35 messages.append({"role": "assistant", "content": full_response})36 await ws.send_text(json.dumps({"type": "done"}))37 38 except WebSocketDisconnect:39 passBackground tasks
PYTHON
1from fastapi import FastAPI, BackgroundTasks2from openai import AsyncOpenAI3import os4 5app = FastAPI()6client = AsyncOpenAI(7 base_url="https://api.modelriver.com/v1",8 api_key=os.environ["MODELRIVER_API_KEY"],9)10 11async def process_document(doc_id: str, content: str):12 """Background task: summarise a document."""13 response = await client.chat.completions.create(14 model="my-summary-workflow",15 messages=[16 {"role": "system", "content": "Summarise the following document."},17 {"role": "user", "content": content},18 ],19 )20 summary = response.choices[0].message.content21 # Save summary to database...22 print(f"Document {doc_id} summarised: {summary[:100]}")23 24@app.post("/documents/{doc_id}/summarise")25async def summarise_document(doc_id: str, background_tasks: BackgroundTasks):26 content = "..." # fetch from DB27 background_tasks.add_task(process_document, doc_id, content)28 return {"status": "processing", "doc_id": doc_id}Dependency injection
PYTHON
1from fastapi import Depends2from openai import AsyncOpenAI3import os4 5def get_ai_client() -> AsyncOpenAI:6 return AsyncOpenAI(7 base_url="https://api.modelriver.com/v1",8 api_key=os.environ["MODELRIVER_API_KEY"],9 )10 11@app.post("/chat")12async def chat(req: ChatRequest, ai: AsyncOpenAI = Depends(get_ai_client)):13 response = await ai.chat.completions.create(14 model=req.workflow,15 messages=[{"role": "user", "content": req.message}],16 )17 return {"content": response.choices[0].message.content}Best practices
- Use
AsyncOpenAI: FastAPI is async-first; always use the async client - Add proper error handling: Catch
openai.APIErrorand return meaningful HTTP errors - Use dependency injection: Makes testing easier with mock clients
- Stream for long responses: Use SSE or WebSocket for user-facing endpoints
- Background tasks for batch processing: Don't block request handlers with heavy AI calls
Next steps
- Django integration: Alternative Python framework
- Streaming guide: SSE deep dive
- API reference: Endpoint documentation