FastAPI 的事件驱动 AI

使用异步 FastAPI 处理程序接收 AI Webhooks,并通过后台任务进行处理,然后回调 ModelRiver:内置完整的 Pydantic 验证。

概述

FastAPI 的异步优先架构和内置的 Pydantic 验证使其成为处理 ModelRiver 事件驱动 AI Webhooks 的绝佳选择。您可以在后台任务中处理 AI 结果,使用类型安全的模型验证负载,并执行非阻塞的回调。

您将构建的内容:

  • 一个具有 Pydantic 负载验证功能的异步 Webhook 端点
  • HMAC 签名验证依赖项 (Dependency)
  • 使用 BackgroundTasks 进行后台任务处理
  • 向 ModelRiver 发起异步 HTTP 回调

快速开始

安装依赖

Bash
pip install fastapi uvicorn httpx python-dotenv

配置

PYTHON
1# config.py
2from pydantic_settings import BaseSettings
3 
4 
5class Settings(BaseSettings):
6 modelriver_api_key: str
7 modelriver_webhook_secret: str
8 
9 class Config:
10 env_file = ".env"
11 
12 
13settings = Settings()

Webhook 处理程序

PYTHON
1# main.py
2import hmac
3import hashlib
4from typing import Any, Optional
5from datetime import datetime
6 
7import httpx
8from fastapi import FastAPI, Request, BackgroundTasks, HTTPException, Depends
9from pydantic import BaseModel
10 
11from config import settings
12 
13app = FastAPI()
14 
15 
16# --- Pydantic 模型 ---
17 
18class AIResponse(BaseModel):
19 data: dict[str, Any]
20 
21 
22class WebhookPayload(BaseModel):
23 type: str
24 event: Optional[str] = None
25 channel_id: str
26 ai_response: Optional[AIResponse] = None
27 callback_url: Optional[str] = None
28 callback_required: Optional[bool] = None
29 customer_data: dict[str, Any] = {}
30 meta: dict[str, Any] = {}
31 timestamp: Optional[str] = None
32 
33 
34class CallbackPayload(BaseModel):
35 data: dict[str, Any]
36 task_id: Optional[str] = None
37 metadata: dict[str, Any] = {}
38 
39 
40class ErrorCallback(BaseModel):
41 error: str
42 message: str
43 
44 
45# --- 签名验证 ---
46 
47async def verify_signature(request: Request):
48 """FastAPI 依赖项,用于验证 Webhook 签名。"""
49 signature = request.headers.get("mr-signature", "")
50 raw_body = await request.body()
51 
52 expected = hmac.new(
53 settings.modelriver_webhook_secret.encode("utf-8"),
54 raw_body,
55 hashlib.sha256,
56 ).hexdigest()
57 
58 if not hmac.compare_digest(signature, expected):
59 raise HTTPException(status_code=401, detail="Invalid signature")
60 
61 return raw_body
62 
63 
64# --- Webhook 端点 ---
65 
66@app.post("/webhooks/modelriver")
67async def handle_webhook(
68 payload: WebhookPayload,
69 background_tasks: BackgroundTasks,
70 _verified: bytes = Depends(verify_signature),
71):
72 # 处理事件驱动的工作流
73 if payload.type == "task.ai_generated" and payload.callback_url:
74 background_tasks.add_task(
75 process_and_callback,
76 event=payload.event,
77 ai_response=payload.ai_response,
78 callback_url=payload.callback_url,
79 customer_data=payload.customer_data,
80 )
81 return {"received": True}
82 
83 # 标准 Webhook
84 return {"received": True}
85 
86 
87# --- 后台处理 ---
88 
89async def process_and_callback(
90 event: str | None,
91 ai_response: AIResponse | None,
92 callback_url: str,
93 customer_data: dict[str, Any],
94):
95 """处理 AI 响应并回调 ModelRiver。"""
96 async with httpx.AsyncClient() as client:
97 try:
98 enriched_data = {**(ai_response.data if ai_response else {})}
99 
100 # 您的自定义业务逻辑
101 if event == "content_ready":
102 # 保存到数据库、生成 Slug 等
103 enriched_data["processed"] = True
104 enriched_data["saved_at"] = datetime.now().isoformat()
105 
106 if event == "entities_extracted":
107 # 验证并存储实体
108 entities = enriched_data.get("entities", [])
109 enriched_data["entities_validated"] = len(entities)
110 
111 # 回调 ModelRiver
112 callback = CallbackPayload(
113 data=enriched_data,
114 task_id=f"fastapi_{event}_{int(datetime.now().timestamp())}",
115 metadata={"processed_by": "fastapi"},
116 )
117 
118 response = await client.post(
119 callback_url,
120 json=callback.model_dump(),
121 headers={
122 "Authorization": f"Bearer {settings.modelriver_api_key}",
123 "Content-Type": "application/json",
124 },
125 timeout=10.0,
126 )
127 response.raise_for_status()
128 print(f"✅ 已针对事件发送回调: {event}")
129 
130 except httpx.HTTPError as exc:
131 print(f"❌ 回调失败: {exc}")
132 
133 # 发送错误回调
134 error = ErrorCallback(
135 error="processing_failed",
136 message=str(exc),
137 )
138 try:
139 await client.post(
140 callback_url,
141 json=error.model_dump(),
142 headers={
143 "Authorization": f"Bearer {settings.modelriver_api_key}",
144 "Content-Type": "application/json",
145 },
146 timeout=10.0,
147 )
148 except Exception:
149 pass

触发异步请求

PYTHON
1# ai/client.py
2import httpx
3from config import settings
4 
5 
6async def trigger_async_ai(workflow: str, prompt: str, metadata: dict = None) -> dict:
7 """触发事件驱动的 AI 请求。"""
8 async with httpx.AsyncClient() as client:
9 response = await client.post(
10 "https://api.modelriver.com/v1/ai/async",
11 headers={
12 "Authorization": f"Bearer {settings.modelriver_api_key}",
13 "Content-Type": "application/json",
14 },
15 json={
16 "workflow": workflow,
17 "messages": [{"role": "user", "content": prompt}],
18 "metadata": metadata or {},
19 },
20 timeout=10.0,
21 )
22 response.raise_for_status()
23 return response.json()
24 
25 
26# 在端点中使用
27@app.post("/ai/generate")
28async def generate_content(prompt: str):
29 result = await trigger_async_ai(
30 workflow="content_generator",
31 prompt=prompt,
32 metadata={"source": "api"},
33 )
34 return {
35 "channel_id": result["channel_id"],
36 "ws_token": result["ws_token"],
37 "websocket_channel": result["websocket_channel"],
38 }

WebSocket 代理

如果您的前端无法直接连接到 ModelRiver 的 WebSocket,可以通过 FastAPI 进行代理:

PYTHON
1from fastapi import WebSocket, WebSocketDisconnect
2import asyncio
3import websockets
4import json
5 
6 
7@app.websocket("/ws/ai/{channel_id}")
8async def ai_websocket_proxy(websocket: WebSocket, channel_id: str):
9 await websocket.accept()
10 
11 # 从查询参数获取 ws_token
12 ws_token = websocket.query_params.get("token", "")
13 
14 async with websockets.connect(
15 f"wss://api.modelriver.com/socket/websocket?token={ws_token}"
16 ) as mr_ws:
17 # 加入 ModelRiver 频道
18 await mr_ws.send(json.dumps({
19 "topic": f"ai_response:{channel_id}",
20 "event": "phx_join",
21 "payload": {},
22 "ref": "1",
23 }))
24 
25 try:
26 async for message in mr_ws:
27 data = json.loads(message)
28 if data.get("event") == "response":
29 await websocket.send_json(data["payload"])
30 break
31 except WebSocketDisconnect:
32 pass

最佳实践

  1. 使用 httpx.AsyncClient:FastAPI 是异步优先的;请避免使用会产生阻塞的 requests 库。
  2. 使用 Pydantic 进行验证:类型安全的 Webhook 负载可以及早捕获格式错误的数据。
  3. 使用 BackgroundTasks:立即返回 200 并在后台进行处理。
  4. 注入依赖项:使用 Depends(verify_signature) 以实现整洁、可复用的验证逻辑。
  5. 处理超时:在所有 HTTP 调用上设置显式的 timeout

下一步