Documentation

Event-driven AI with FastAPI

Receive AI webhooks with async FastAPI handlers, process with background tasks, and call back to ModelRiver: built-in Pydantic validation included.

Overview

FastAPI's async-first architecture and built-in Pydantic validation make it an excellent choice for handling ModelRiver event-driven AI webhooks. Process AI results in background tasks, validate payloads with type-safe models, and execute non-blocking callbacks.

What you'll build:

  • An async webhook endpoint with Pydantic payload validation
  • HMAC signature verification dependency
  • Background task processing with BackgroundTasks
  • Async HTTP callback to ModelRiver

Quick start

Install dependencies

Bash
pip install fastapi uvicorn httpx python-dotenv

Configuration

PYTHON
1# config.py
2from pydantic_settings import BaseSettings
3 
4 
5class Settings(BaseSettings):
6 modelriver_api_key: str
7 modelriver_webhook_secret: str
8 
9 class Config:
10 env_file = ".env"
11 
12 
13settings = Settings()

Webhook handler

PYTHON
1# main.py
2import hmac
3import hashlib
4from typing import Any, Optional
5 
6import httpx
7from fastapi import FastAPI, Request, BackgroundTasks, HTTPException, Depends
8from pydantic import BaseModel
9 
10from config import settings
11 
12app = FastAPI()
13 
14 
15# --- Pydantic models ---
16 
17class AIResponse(BaseModel):
18 data: dict[str, Any]
19 
20 
21class WebhookPayload(BaseModel):
22 type: str
23 event: Optional[str] = None
24 channel_id: str
25 ai_response: Optional[AIResponse] = None
26 callback_url: Optional[str] = None
27 callback_required: Optional[bool] = None
28 customer_data: dict[str, Any] = {}
29 meta: dict[str, Any] = {}
30 timestamp: Optional[str] = None
31 
32 
33class CallbackPayload(BaseModel):
34 data: dict[str, Any]
35 task_id: Optional[str] = None
36 metadata: dict[str, Any] = {}
37 
38 
39class ErrorCallback(BaseModel):
40 error: str
41 message: str
42 
43 
44# --- Signature verification ---
45 
46async def verify_signature(request: Request):
47 """FastAPI dependency that verifies the webhook signature."""
48 signature = request.headers.get("mr-signature", "")
49 raw_body = await request.body()
50 
51 expected = hmac.new(
52 settings.modelriver_webhook_secret.encode("utf-8"),
53 raw_body,
54 hashlib.sha256,
55 ).hexdigest()
56 
57 if not hmac.compare_digest(signature, expected):
58 raise HTTPException(status_code=401, detail="Invalid signature")
59 
60 return raw_body
61 
62 
63# --- Webhook endpoint ---
64 
65@app.post("/webhooks/modelriver")
66async def handle_webhook(
67 payload: WebhookPayload,
68 background_tasks: BackgroundTasks,
69 _verified: bytes = Depends(verify_signature),
70):
71 # Handle event-driven workflow
72 if payload.type == "task.ai_generated" and payload.callback_url:
73 background_tasks.add_task(
74 process_and_callback,
75 event=payload.event,
76 ai_response=payload.ai_response,
77 callback_url=payload.callback_url,
78 customer_data=payload.customer_data,
79 )
80 return {"received": True}
81 
82 # Standard webhook
83 return {"received": True}
84 
85 
86# --- Background processing ---
87 
88async def process_and_callback(
89 event: str | None,
90 ai_response: AIResponse | None,
91 callback_url: str,
92 customer_data: dict[str, Any],
93):
94 """Process the AI response and call back to ModelRiver."""
95 async with httpx.AsyncClient() as client:
96 try:
97 enriched_data = {**(ai_response.data if ai_response else {})}
98 
99 # Your custom business logic
100 if event == "content_ready":
101 # Save to database, generate slugs, etc.
102 enriched_data["processed"] = True
103 enriched_data["saved_at"] = datetime.now().isoformat()
104 
105 if event == "entities_extracted":
106 # Validate and store entities
107 entities = enriched_data.get("entities", [])
108 enriched_data["entities_validated"] = len(entities)
109 
110 # Call back to ModelRiver
111 callback = CallbackPayload(
112 data=enriched_data,
113 task_id=f"fastapi_{event}_{int(datetime.now().timestamp())}",
114 metadata={"processed_by": "fastapi"},
115 )
116 
117 response = await client.post(
118 callback_url,
119 json=callback.model_dump(),
120 headers={
121 "Authorization": f"Bearer {settings.modelriver_api_key}",
122 "Content-Type": "application/json",
123 },
124 timeout=10.0,
125 )
126 response.raise_for_status()
127 print(f"✅ Callback sent for event: {event}")
128 
129 except httpx.HTTPError as exc:
130 print(f"❌ Callback failed: {exc}")
131 
132 # Send error callback
133 error = ErrorCallback(
134 error="processing_failed",
135 message=str(exc),
136 )
137 try:
138 await client.post(
139 callback_url,
140 json=error.model_dump(),
141 headers={
142 "Authorization": f"Bearer {settings.modelriver_api_key}",
143 "Content-Type": "application/json",
144 },
145 timeout=10.0,
146 )
147 except Exception:
148 pass
149 
150 
151from datetime import datetime

Triggering async requests

PYTHON
1# ai/client.py
2import httpx
3from config import settings
4 
5 
6async def trigger_async_ai(workflow: str, prompt: str, metadata: dict = None) -> dict:
7 """Trigger an event-driven AI request."""
8 async with httpx.AsyncClient() as client:
9 response = await client.post(
10 "https://api.modelriver.com/v1/ai/async",
11 headers={
12 "Authorization": f"Bearer {settings.modelriver_api_key}",
13 "Content-Type": "application/json",
14 },
15 json={
16 "workflow": workflow,
17 "messages": [{"role": "user", "content": prompt}],
18 "metadata": metadata or {},
19 },
20 timeout=10.0,
21 )
22 response.raise_for_status()
23 return response.json()
24 
25 
26# Usage in an endpoint
27@app.post("/ai/generate")
28async def generate_content(prompt: str):
29 result = await trigger_async_ai(
30 workflow="content_generator",
31 prompt=prompt,
32 metadata={"source": "api"},
33 )
34 return {
35 "channel_id": result["channel_id"],
36 "ws_token": result["ws_token"],
37 "websocket_channel": result["websocket_channel"],
38 }

WebSocket proxy

If your frontend can't connect to ModelRiver's WebSocket directly, proxy through FastAPI:

PYTHON
1from fastapi import WebSocket, WebSocketDisconnect
2import asyncio
3import websockets
4import json
5 
6 
7@app.websocket("/ws/ai/{channel_id}")
8async def ai_websocket_proxy(websocket: WebSocket, channel_id: str):
9 await websocket.accept()
10 
11 # Get ws_token from query params
12 ws_token = websocket.query_params.get("token", "")
13 
14 async with websockets.connect(
15 f"wss://api.modelriver.com/socket/websocket?token={ws_token}"
16 ) as mr_ws:
17 # Join the ModelRiver channel
18 await mr_ws.send(json.dumps({
19 "topic": f"ai_response:{channel_id}",
20 "event": "phx_join",
21 "payload": {},
22 "ref": "1",
23 }))
24 
25 try:
26 async for message in mr_ws:
27 data = json.loads(message)
28 if data.get("event") == "response":
29 await websocket.send_json(data["payload"])
30 break
31 except WebSocketDisconnect:
32 pass

Best practices

  1. Use httpx.AsyncClient: FastAPI is async-first; avoid blocking requests library.
  2. Validate with Pydantic: Type-safe webhook payloads catch malformed data early.
  3. Use BackgroundTasks: Return 200 immediately and process in the background.
  4. Inject dependencies: Use Depends(verify_signature) for clean, reusable verification.
  5. Handle timeouts: Set explicit timeout on all HTTP calls.

Next steps