概述
FastAPI 的异步优先架构和内置的 Pydantic 验证使其成为处理 ModelRiver 事件驱动 AI Webhooks 的绝佳选择。您可以在后台任务中处理 AI 结果,使用类型安全的模型验证负载,并执行非阻塞的回调。
您将构建的内容:
- 一个具有 Pydantic 负载验证功能的异步 Webhook 端点
- HMAC 签名验证依赖项 (Dependency)
- 使用
BackgroundTasks进行后台任务处理 - 向 ModelRiver 发起异步 HTTP 回调
快速开始
安装依赖
Bash
pip install fastapi uvicorn httpx python-dotenv配置
PYTHON
1# config.py2from pydantic_settings import BaseSettings3 4 5class Settings(BaseSettings):6 modelriver_api_key: str7 modelriver_webhook_secret: str8 9 class Config:10 env_file = ".env"11 12 13settings = Settings()Webhook 处理程序
PYTHON
1# main.py2import hmac3import hashlib4from typing import Any, Optional5from datetime import datetime6 7import httpx8from fastapi import FastAPI, Request, BackgroundTasks, HTTPException, Depends9from pydantic import BaseModel10 11from config import settings12 13app = FastAPI()14 15 16# --- Pydantic 模型 ---17 18class AIResponse(BaseModel):19 data: dict[str, Any]20 21 22class WebhookPayload(BaseModel):23 type: str24 event: Optional[str] = None25 channel_id: str26 ai_response: Optional[AIResponse] = None27 callback_url: Optional[str] = None28 callback_required: Optional[bool] = None29 customer_data: dict[str, Any] = {}30 meta: dict[str, Any] = {}31 timestamp: Optional[str] = None32 33 34class CallbackPayload(BaseModel):35 data: dict[str, Any]36 task_id: Optional[str] = None37 metadata: dict[str, Any] = {}38 39 40class ErrorCallback(BaseModel):41 error: str42 message: str43 44 45# --- 签名验证 ---46 47async def verify_signature(request: Request):48 """FastAPI 依赖项,用于验证 Webhook 签名。"""49 signature = request.headers.get("mr-signature", "")50 raw_body = await request.body()51 52 expected = hmac.new(53 settings.modelriver_webhook_secret.encode("utf-8"),54 raw_body,55 hashlib.sha256,56 ).hexdigest()57 58 if not hmac.compare_digest(signature, expected):59 raise HTTPException(status_code=401, detail="Invalid signature")60 61 return raw_body62 63 64# --- Webhook 端点 ---65 66@app.post("/webhooks/modelriver")67async def handle_webhook(68 payload: WebhookPayload,69 background_tasks: BackgroundTasks,70 _verified: bytes = Depends(verify_signature),71):72 # 处理事件驱动的工作流73 if payload.type == "task.ai_generated" and payload.callback_url:74 background_tasks.add_task(75 process_and_callback,76 event=payload.event,77 ai_response=payload.ai_response,78 callback_url=payload.callback_url,79 customer_data=payload.customer_data,80 )81 return {"received": True}82 83 # 标准 Webhook84 return {"received": True}85 86 87# --- 后台处理 ---88 89async def process_and_callback(90 event: str | None,91 ai_response: AIResponse | None,92 callback_url: str,93 customer_data: dict[str, Any],94):95 """处理 AI 响应并回调 ModelRiver。"""96 async with httpx.AsyncClient() as client:97 try:98 enriched_data = {**(ai_response.data if ai_response else {})}99 100 # 您的自定义业务逻辑101 if event == "content_ready":102 # 保存到数据库、生成 Slug 等103 enriched_data["processed"] = True104 enriched_data["saved_at"] = datetime.now().isoformat()105 106 if event == "entities_extracted":107 # 验证并存储实体108 entities = enriched_data.get("entities", [])109 enriched_data["entities_validated"] = len(entities)110 111 # 回调 ModelRiver112 callback = CallbackPayload(113 data=enriched_data,114 task_id=f"fastapi_{event}_{int(datetime.now().timestamp())}",115 metadata={"processed_by": "fastapi"},116 )117 118 response = await client.post(119 callback_url,120 json=callback.model_dump(),121 headers={122 "Authorization": f"Bearer {settings.modelriver_api_key}",123 "Content-Type": "application/json",124 },125 timeout=10.0,126 )127 response.raise_for_status()128 print(f"✅ 已针对事件发送回调: {event}")129 130 except httpx.HTTPError as exc:131 print(f"❌ 回调失败: {exc}")132 133 # 发送错误回调134 error = ErrorCallback(135 error="processing_failed",136 message=str(exc),137 )138 try:139 await client.post(140 callback_url,141 json=error.model_dump(),142 headers={143 "Authorization": f"Bearer {settings.modelriver_api_key}",144 "Content-Type": "application/json",145 },146 timeout=10.0,147 )148 except Exception:149 pass触发异步请求
PYTHON
1# ai/client.py2import httpx3from config import settings4 5 6async def trigger_async_ai(workflow: str, prompt: str, metadata: dict = None) -> dict:7 """触发事件驱动的 AI 请求。"""8 async with httpx.AsyncClient() as client:9 response = await client.post(10 "https://api.modelriver.com/v1/ai/async",11 headers={12 "Authorization": f"Bearer {settings.modelriver_api_key}",13 "Content-Type": "application/json",14 },15 json={16 "workflow": workflow,17 "messages": [{"role": "user", "content": prompt}],18 "metadata": metadata or {},19 },20 timeout=10.0,21 )22 response.raise_for_status()23 return response.json()24 25 26# 在端点中使用27@app.post("/ai/generate")28async def generate_content(prompt: str):29 result = await trigger_async_ai(30 workflow="content_generator",31 prompt=prompt,32 metadata={"source": "api"},33 )34 return {35 "channel_id": result["channel_id"],36 "ws_token": result["ws_token"],37 "websocket_channel": result["websocket_channel"],38 }WebSocket 代理
如果您的前端无法直接连接到 ModelRiver 的 WebSocket,可以通过 FastAPI 进行代理:
PYTHON
1from fastapi import WebSocket, WebSocketDisconnect2import asyncio3import websockets4import json5 6 7@app.websocket("/ws/ai/{channel_id}")8async def ai_websocket_proxy(websocket: WebSocket, channel_id: str):9 await websocket.accept()10 11 # 从查询参数获取 ws_token12 ws_token = websocket.query_params.get("token", "")13 14 async with websockets.connect(15 f"wss://api.modelriver.com/socket/websocket?token={ws_token}"16 ) as mr_ws:17 # 加入 ModelRiver 频道18 await mr_ws.send(json.dumps({19 "topic": f"ai_response:{channel_id}",20 "event": "phx_join",21 "payload": {},22 "ref": "1",23 }))24 25 try:26 async for message in mr_ws:27 data = json.loads(message)28 if data.get("event") == "response":29 await websocket.send_json(data["payload"])30 break31 except WebSocketDisconnect:32 pass最佳实践
- 使用
httpx.AsyncClient:FastAPI 是异步优先的;请避免使用会产生阻塞的requests库。 - 使用 Pydantic 进行验证:类型安全的 Webhook 负载可以及早捕获格式错误的数据。
- 使用
BackgroundTasks:立即返回200并在后台进行处理。 - 注入依赖项:使用
Depends(verify_signature)以实现整洁、可复用的验证逻辑。 - 处理超时:在所有 HTTP 调用上设置显式的
timeout。
下一步
- Laravel 事件驱动指南:PHP 备选方案
- FastAPI 集成:标准 ModelRiver + FastAPI 的用法
- Webhooks 参考:重试策略和投递监控