概述
Convex 是一个响应式数据库平台,当数据发生变化时,查询会自动更新。将其与 ModelRiver 事件驱动 AI 结合,您将获得一个端到端的响应式管道:AI 生成数据 → Webhook 写入 Convex → 您的 React 组件立即更新。
您将构建的内容:
- 一个用于接收 ModelRiver Webhooks 的 Convex HTTP Action
- 一个用于写入 AI 生成数据的 Mutation
- 能够自动更新 UI 的实时查询 (Real-time queries)
- 带有记录详情的回调 ModelRiver
架构 (Schema)
TYPESCRIPT
1// convex/schema.ts2import { defineSchema, defineTable } from "convex/server";3import { v } from "convex/values";4 5export default defineSchema({6 aiContent: defineTable({7 title: v.string(),8 body: v.optional(v.string()),9 category: v.string(),10 metadata: v.any(),11 channelId: v.optional(v.string()),12 eventName: v.optional(v.string()),13 source: v.string(),14 })15 .index("by_channel", ["channelId"])16 .index("by_event", ["eventName"])17 .index("by_creation", ["_creationTime"]),18});Mutation
TYPESCRIPT
1// convex/aiContent.ts2import { mutation, query } from "./_generated/server";3import { v } from "convex/values";4 5export const insert = mutation({6 args: {7 title: v.string(),8 body: v.optional(v.string()),9 category: v.string(),10 metadata: v.any(),11 channelId: v.optional(v.string()),12 eventName: v.optional(v.string()),13 },14 handler: async (ctx, args) => {15 const id = await ctx.db.insert("aiContent", {16 ...args,17 source: "modelriver",18 });19 return id;20 },21});22 23export const list = query({24 args: {25 limit: v.optional(v.number()),26 },27 handler: async (ctx, args) => {28 return await ctx.db29 .query("aiContent")30 .order("desc")31 .take(args.limit ?? 20);32 },33});34 35export const getByChannel = query({36 args: {37 channelId: v.string(),38 },39 handler: async (ctx, args) => {40 return await ctx.db41 .query("aiContent")42 .withIndex("by_channel", (q) => q.eq("channelId", args.channelId))43 .first();44 },45});HTTP Action (Webhook 处理程序)
TYPESCRIPT
1// convex/http.ts2import { httpRouter } from "convex/server";3import { httpAction } from "./_generated/server";4import { api } from "./_generated/api";5 6const http = httpRouter();7 8http.route({9 path: "/webhooks/modelriver",10 method: "POST",11 handler: httpAction(async (ctx, request) => {12 const signature = request.headers.get("mr-signature") ?? "";13 const rawBody = await request.text();14 15 // 1. 验证签名16 const webhookSecret = process.env.MODELRIVER_WEBHOOK_SECRET ?? "";17 const encoder = new TextEncoder();18 const key = await crypto.subtle.importKey(19 "raw",20 encoder.encode(webhookSecret),21 { name: "HMAC", hash: "SHA-256" },22 false,23 ["sign"]24 );25 const sig = await crypto.subtle.sign("HMAC", key, encoder.encode(rawBody));26 const expected = Array.from(new Uint8Array(sig))27 .map((b) => b.toString(16).padStart(2, "0"))28 .join("");29 30 if (expected !== signature) {31 return new Response(JSON.stringify({ error: "Invalid signature" }), {32 status: 401,33 headers: { "Content-Type": "application/json" },34 });35 }36 37 const payload = JSON.parse(rawBody);38 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;39 40 // 2. 处理事件驱动的工作流41 if (type === "task.ai_generated" && callback_url) {42 try {43 const aiData = ai_response?.data ?? {};44 45 // 3. 通过 Mutation 写入 Convex 数据库46 const recordId = await ctx.runMutation(api.aiContent.insert, {47 title: aiData.title ?? "Untitled",48 body: aiData.description ?? aiData.body ?? "",49 category: customer_data?.category ?? "general",50 metadata: {51 ...customer_data,52 ai_model: payload.meta?.model,53 ai_provider: payload.meta?.provider,54 },55 channelId: channel_id,56 eventName: event,57 });58 59 // 4. 回调 ModelRiver60 const callbackResponse = await fetch(callback_url, {61 method: "POST",62 headers: {63 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,64 "Content-Type": "application/json",65 },66 body: JSON.stringify({67 data: {68 ...aiData,69 id: recordId,70 saved_at: new Date().toISOString(),71 },72 task_id: `convex_${recordId}`,73 metadata: {74 database: "convex",75 table: "aiContent",76 record_id: recordId,77 },78 }),79 });80 81 if (!callbackResponse.ok) {82 throw new Error(`回调失败: ${callbackResponse.status}`);83 }84 85 return new Response(JSON.stringify({ received: true }), {86 status: 200,87 headers: { "Content-Type": "application/json" },88 });89 90 } catch (error: any) {91 console.error("错误:", error);92 93 await fetch(callback_url, {94 method: "POST",95 headers: {96 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,97 "Content-Type": "application/json",98 },99 body: JSON.stringify({100 error: "processing_failed",101 message: error.message,102 }),103 });104 105 return new Response(JSON.stringify({ received: true }), {106 status: 200,107 headers: { "Content-Type": "application/json" },108 });109 }110 }111 112 return new Response(JSON.stringify({ received: true }), {113 status: 200,114 headers: { "Content-Type": "application/json" },115 });116 }),117});118 119export default http;React 前端 (实时)
Convex 的查询是响应式的:当 Webhook 写入新数据时,UI 会自动更新:
TSX
1// src/App.tsx2import { useQuery } from "convex/react";3import { api } from "../convex/_generated/api";4 5export function AiContentFeed() {6 // 当插入新记录时,此查询会自动更新!7 const items = useQuery(api.aiContent.list, { limit: 10 });8 9 if (items === undefined) return <p>正在加载...</p>;10 11 return (12 <div className="space-y-4">13 {items.map((item) => (14 <div key={item._id} className="p-4 border rounded-lg">15 <h3 className="font-bold text-lg">{item.title}</h3>16 <p className="text-zinc-600">{item.body}</p>17 <div className="flex gap-2 mt-2">18 <span className="text-xs bg-blue-100 text-blue-800 px-2 py-1 rounded">19 {item.category}20 </span>21 <span className="text-xs text-zinc-400">22 {new Date(item._creationTime).toLocaleString()}23 </span>24 </div>25 </div>26 ))}27 </div>28 );29}无需 WebSocket 配置、无需轮询、无需订阅:Convex 会自动处理响应性。
最佳实践
- 为 Webhooks 使用 HTTP Actions:Convex HTTP actions 可以接收外部 HTTP 请求。
- 为写入使用 Mutations:所有的数据库写入都通过经过验证的 mutations 进行。
- 利用响应式查询 (Reactive Queries):无需独立的 WebSocket/轮询基础设施。
- 使用 Web Crypto API:Convex HTTP actions 运行在内置 Web Crypto 的 V8 环境中。
- 存储环境变量:使用 Convex 控制面板中的设置来存储 API 密钥和密钥。
下一步
- 返回 无服务器数据库:所有数据库指南
- 事件驱动 AI 概述:架构和流程
- Webhooks 参考:重试政策和投递监控