Overview
FastAPI's async-first architecture and built-in Pydantic validation make it an excellent choice for handling ModelRiver event-driven AI webhooks. Process AI results in background tasks, validate payloads with type-safe models, and execute non-blocking callbacks.
What you'll build:
- An async webhook endpoint with Pydantic payload validation
- HMAC signature verification dependency
- Background task processing with
BackgroundTasks - Async HTTP callback to ModelRiver
Quick start
Install dependencies
Bash
pip install fastapi uvicorn httpx python-dotenvConfiguration
PYTHON
1# config.py2from pydantic_settings import BaseSettings3 4 5class Settings(BaseSettings):6 modelriver_api_key: str7 modelriver_webhook_secret: str8 9 class Config:10 env_file = ".env"11 12 13settings = Settings()Webhook handler
PYTHON
1# main.py2import hmac3import hashlib4from typing import Any, Optional5 6import httpx7from fastapi import FastAPI, Request, BackgroundTasks, HTTPException, Depends8from pydantic import BaseModel9 10from config import settings11 12app = FastAPI()13 14 15# --- Pydantic models ---16 17class AIResponse(BaseModel):18 data: dict[str, Any]19 20 21class WebhookPayload(BaseModel):22 type: str23 event: Optional[str] = None24 channel_id: str25 ai_response: Optional[AIResponse] = None26 callback_url: Optional[str] = None27 callback_required: Optional[bool] = None28 customer_data: dict[str, Any] = {}29 meta: dict[str, Any] = {}30 timestamp: Optional[str] = None31 32 33class CallbackPayload(BaseModel):34 data: dict[str, Any]35 task_id: Optional[str] = None36 metadata: dict[str, Any] = {}37 38 39class ErrorCallback(BaseModel):40 error: str41 message: str42 43 44# --- Signature verification ---45 46async def verify_signature(request: Request):47 """FastAPI dependency that verifies the webhook signature."""48 signature = request.headers.get("mr-signature", "")49 raw_body = await request.body()50 51 expected = hmac.new(52 settings.modelriver_webhook_secret.encode("utf-8"),53 raw_body,54 hashlib.sha256,55 ).hexdigest()56 57 if not hmac.compare_digest(signature, expected):58 raise HTTPException(status_code=401, detail="Invalid signature")59 60 return raw_body61 62 63# --- Webhook endpoint ---64 65@app.post("/webhooks/modelriver")66async def handle_webhook(67 payload: WebhookPayload,68 background_tasks: BackgroundTasks,69 _verified: bytes = Depends(verify_signature),70):71 # Handle event-driven workflow72 if payload.type == "task.ai_generated" and payload.callback_url:73 background_tasks.add_task(74 process_and_callback,75 event=payload.event,76 ai_response=payload.ai_response,77 callback_url=payload.callback_url,78 customer_data=payload.customer_data,79 )80 return {"received": True}81 82 # Standard webhook83 return {"received": True}84 85 86# --- Background processing ---87 88async def process_and_callback(89 event: str | None,90 ai_response: AIResponse | None,91 callback_url: str,92 customer_data: dict[str, Any],93):94 """Process the AI response and call back to ModelRiver."""95 async with httpx.AsyncClient() as client:96 try:97 enriched_data = {**(ai_response.data if ai_response else {})}98 99 # Your custom business logic100 if event == "content_ready":101 # Save to database, generate slugs, etc.102 enriched_data["processed"] = True103 enriched_data["saved_at"] = datetime.now().isoformat()104 105 if event == "entities_extracted":106 # Validate and store entities107 entities = enriched_data.get("entities", [])108 enriched_data["entities_validated"] = len(entities)109 110 # Call back to ModelRiver111 callback = CallbackPayload(112 data=enriched_data,113 task_id=f"fastapi_{event}_{int(datetime.now().timestamp())}",114 metadata={"processed_by": "fastapi"},115 )116 117 response = await client.post(118 callback_url,119 json=callback.model_dump(),120 headers={121 "Authorization": f"Bearer {settings.modelriver_api_key}",122 "Content-Type": "application/json",123 },124 timeout=10.0,125 )126 response.raise_for_status()127 print(f"✅ Callback sent for event: {event}")128 129 except httpx.HTTPError as exc:130 print(f"❌ Callback failed: {exc}")131 132 # Send error callback133 error = ErrorCallback(134 error="processing_failed",135 message=str(exc),136 )137 try:138 await client.post(139 callback_url,140 json=error.model_dump(),141 headers={142 "Authorization": f"Bearer {settings.modelriver_api_key}",143 "Content-Type": "application/json",144 },145 timeout=10.0,146 )147 except Exception:148 pass149 150 151from datetime import datetimeTriggering async requests
PYTHON
1# ai/client.py2import httpx3from config import settings4 5 6async def trigger_async_ai(workflow: str, prompt: str, metadata: dict = None) -> dict:7 """Trigger an event-driven AI request."""8 async with httpx.AsyncClient() as client:9 response = await client.post(10 "https://api.modelriver.com/v1/ai/async",11 headers={12 "Authorization": f"Bearer {settings.modelriver_api_key}",13 "Content-Type": "application/json",14 },15 json={16 "workflow": workflow,17 "messages": [{"role": "user", "content": prompt}],18 "metadata": metadata or {},19 },20 timeout=10.0,21 )22 response.raise_for_status()23 return response.json()24 25 26# Usage in an endpoint27@app.post("/ai/generate")28async def generate_content(prompt: str):29 result = await trigger_async_ai(30 workflow="content_generator",31 prompt=prompt,32 metadata={"source": "api"},33 )34 return {35 "channel_id": result["channel_id"],36 "ws_token": result["ws_token"],37 "websocket_channel": result["websocket_channel"],38 }WebSocket proxy
If your frontend can't connect to ModelRiver's WebSocket directly, proxy through FastAPI:
PYTHON
1from fastapi import WebSocket, WebSocketDisconnect2import asyncio3import websockets4import json5 6 7@app.websocket("/ws/ai/{channel_id}")8async def ai_websocket_proxy(websocket: WebSocket, channel_id: str):9 await websocket.accept()10 11 # Get ws_token from query params12 ws_token = websocket.query_params.get("token", "")13 14 async with websockets.connect(15 f"wss://api.modelriver.com/socket/websocket?token={ws_token}"16 ) as mr_ws:17 # Join the ModelRiver channel18 await mr_ws.send(json.dumps({19 "topic": f"ai_response:{channel_id}",20 "event": "phx_join",21 "payload": {},22 "ref": "1",23 }))24 25 try:26 async for message in mr_ws:27 data = json.loads(message)28 if data.get("event") == "response":29 await websocket.send_json(data["payload"])30 break31 except WebSocketDisconnect:32 passBest practices
- Use
httpx.AsyncClient: FastAPI is async-first; avoid blockingrequestslibrary. - Validate with Pydantic: Type-safe webhook payloads catch malformed data early.
- Use
BackgroundTasks: Return200immediately and process in the background. - Inject dependencies: Use
Depends(verify_signature)for clean, reusable verification. - Handle timeouts: Set explicit
timeouton all HTTP calls.
Next steps
- Laravel event-driven guide: PHP alternative
- FastAPI integration: Standard ModelRiver + FastAPI usage
- Webhooks reference: Retry policies and delivery monitoring