Documentation

FastAPI + ModelRiver

High-performance AI APIs with async Python. REST endpoints, WebSocket streaming, and background task processing: all routed through ModelRiver.

Overview

FastAPI is Python's fastest-growing API framework, built on async/await. Combined with ModelRiver, you can build AI-powered REST APIs, WebSocket streaming endpoints, and background processing pipelines with minimal code.


Quick start

Install dependencies

Bash
pip install fastapi uvicorn openai python-dotenv

Basic AI endpoint

PYTHON
1# main.py
2from fastapi import FastAPI
3from openai import AsyncOpenAI
4from pydantic import BaseModel
5import os
6 
7app = FastAPI()
8 
9client = AsyncOpenAI(
10 base_url="https://api.modelriver.com/v1",
11 api_key=os.environ["MODELRIVER_API_KEY"],
12)
13 
14class ChatRequest(BaseModel):
15 message: str
16 workflow: str = "my-chat-workflow"
17 
18class ChatResponse(BaseModel):
19 content: str
20 model: str
21 tokens: int
22 
23@app.post("/chat", response_model=ChatResponse)
24async def chat(req: ChatRequest):
25 response = await client.chat.completions.create(
26 model=req.workflow,
27 messages=[{"role": "user", "content": req.message}],
28 )
29 
30 return ChatResponse(
31 content=response.choices[0].message.content,
32 model=response.model,
33 tokens=response.usage.total_tokens,
34 )
Bash
uvicorn main:app --reload

Streaming with SSE

PYTHON
1from fastapi import FastAPI
2from fastapi.responses import StreamingResponse
3from openai import AsyncOpenAI
4import os
5 
6app = FastAPI()
7client = AsyncOpenAI(
8 base_url="https://api.modelriver.com/v1",
9 api_key=os.environ["MODELRIVER_API_KEY"],
10)
11 
12@app.post("/chat/stream")
13async def chat_stream(req: ChatRequest):
14 async def generate():
15 stream = await client.chat.completions.create(
16 model=req.workflow,
17 messages=[{"role": "user", "content": req.message}],
18 stream=True,
19 )
20 async for chunk in stream:
21 content = chunk.choices[0].delta.content
22 if content:
23 yield f"data: {content}\n\n"
24 yield "data: [DONE]\n\n"
25 
26 return StreamingResponse(generate(), media_type="text/event-stream")

WebSocket chat

PYTHON
1from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2from openai import AsyncOpenAI
3import json, os
4 
5app = FastAPI()
6client = AsyncOpenAI(
7 base_url="https://api.modelriver.com/v1",
8 api_key=os.environ["MODELRIVER_API_KEY"],
9)
10 
11@app.websocket("/ws/chat")
12async def websocket_chat(ws: WebSocket):
13 await ws.accept()
14 messages = []
15 
16 try:
17 while True:
18 data = await ws.receive_text()
19 user_msg = json.loads(data)
20 messages.append({"role": "user", "content": user_msg["content"]})
21 
22 stream = await client.chat.completions.create(
23 model="my-chat-workflow",
24 messages=messages,
25 stream=True,
26 )
27 
28 full_response = ""
29 async for chunk in stream:
30 content = chunk.choices[0].delta.content
31 if content:
32 full_response += content
33 await ws.send_text(json.dumps({"type": "chunk", "content": content}))
34 
35 messages.append({"role": "assistant", "content": full_response})
36 await ws.send_text(json.dumps({"type": "done"}))
37 
38 except WebSocketDisconnect:
39 pass

Background tasks

PYTHON
1from fastapi import FastAPI, BackgroundTasks
2from openai import AsyncOpenAI
3import os
4 
5app = FastAPI()
6client = AsyncOpenAI(
7 base_url="https://api.modelriver.com/v1",
8 api_key=os.environ["MODELRIVER_API_KEY"],
9)
10 
11async def process_document(doc_id: str, content: str):
12 """Background task: summarise a document."""
13 response = await client.chat.completions.create(
14 model="my-summary-workflow",
15 messages=[
16 {"role": "system", "content": "Summarise the following document."},
17 {"role": "user", "content": content},
18 ],
19 )
20 summary = response.choices[0].message.content
21 # Save summary to database...
22 print(f"Document {doc_id} summarised: {summary[:100]}")
23 
24@app.post("/documents/{doc_id}/summarise")
25async def summarise_document(doc_id: str, background_tasks: BackgroundTasks):
26 content = "..." # fetch from DB
27 background_tasks.add_task(process_document, doc_id, content)
28 return {"status": "processing", "doc_id": doc_id}

Dependency injection

PYTHON
1from fastapi import Depends
2from openai import AsyncOpenAI
3import os
4 
5def get_ai_client() -> AsyncOpenAI:
6 return AsyncOpenAI(
7 base_url="https://api.modelriver.com/v1",
8 api_key=os.environ["MODELRIVER_API_KEY"],
9 )
10 
11@app.post("/chat")
12async def chat(req: ChatRequest, ai: AsyncOpenAI = Depends(get_ai_client)):
13 response = await ai.chat.completions.create(
14 model=req.workflow,
15 messages=[{"role": "user", "content": req.message}],
16 )
17 return {"content": response.choices[0].message.content}

Best practices

  1. Use AsyncOpenAI: FastAPI is async-first; always use the async client
  2. Add proper error handling: Catch openai.APIError and return meaningful HTTP errors
  3. Use dependency injection: Makes testing easier with mock clients
  4. Stream for long responses: Use SSE or WebSocket for user-facing endpoints
  5. Background tasks for batch processing: Don't block request handlers with heavy AI calls

Next steps