概述
Supabase 在一个平台中结合了 Postgres、实时订阅和 Edge Functions。这使其成为事件驱动 AI 的理想选择:您的 Edge Function 接收 Webhook,直接将 AI 数据写入 Postgres,然后 Supabase Realtime 将变更推送到已连接的客户端。
您将构建的内容:
- 一个用于接收 ModelRiver Webhooks 的 Supabase Edge Function
- 用于存储 AI 生成内容的数据库架构 (Schema)
- 用于即时前端更新的实时订阅
- 携带增强后的数据库记录回调 ModelRiver
数据库架构 (Schema)
SQL
1-- supabase/migrations/001_ai_content.sql2create table ai_content (3 id uuid default gen_random_uuid() primary key,4 title text not null,5 body text,6 category text default 'general',7 metadata jsonb default '{}',8 channel_id text,9 event_name text,10 source text default 'modelriver',11 created_at timestamptz default now(),12 updated_at timestamptz default now()13);14 15-- 启用实时功能16alter publication supabase_realtime add table ai_content;17 18-- 行级安全性 (RLS)19alter table ai_content enable row level security;20 21create policy "用户可以查看自己的内容"22 on ai_content for select23 using (metadata->>'user_id' = auth.uid()::text);Edge Function Webhook 处理程序
TYPESCRIPT
1// supabase/functions/modelriver-webhook/index.ts2import { serve } from "https://deno.land/[email protected]/http/server.ts";3import { createClient } from "https://esm.sh/@supabase/supabase-js@2";4 5const supabase = createClient(6 Deno.env.get("SUPABASE_URL")!,7 Deno.env.get("SUPABASE_SERVICE_ROLE_KEY")!8);9 10serve(async (req) => {11 if (req.method !== "POST") {12 return new Response("Method not allowed", { status: 405 });13 }14 15 const signature = req.headers.get("mr-signature") ?? "";16 const rawBody = await req.text();17 const webhookSecret = Deno.env.get("MODELRIVER_WEBHOOK_SECRET") ?? "";18 19 // 1. 验证签名(在生产环境中使用完整的 HMAC-SHA256 验证)20 const key = await crypto.subtle.importKey(21 "raw",22 new TextEncoder().encode(webhookSecret),23 { name: "HMAC", hash: "SHA-256" },24 false,25 ["sign"]26 );27 const sig = await crypto.subtle.sign("HMAC", key, new TextEncoder().encode(rawBody));28 const expected = Array.from(new Uint8Array(sig))29 .map((b) => b.toString(16).padStart(2, "0"))30 .join("");31 32 if (expected !== signature) {33 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });34 }35 36 const payload = JSON.parse(rawBody);37 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;38 39 // 2. 处理事件驱动的工作流40 if (type === "task.ai_generated" && callback_url) {41 try {42 const aiData = ai_response?.data ?? {};43 44 // 3. 写入 Supabase45 const { data: record, error } = await supabase46 .from("ai_content")47 .insert({48 title: aiData.title ?? "Untitled",49 body: aiData.description ?? aiData.body ?? "",50 category: customer_data?.category ?? "general",51 metadata: {52 ...customer_data,53 ai_model: payload.meta?.model,54 ai_provider: payload.meta?.provider,55 },56 channel_id,57 event_name: event,58 })59 .select()60 .single();61 62 if (error) throw error;63 64 // 4. 携带数据库记录回调 ModelRiver65 const callbackResponse = await fetch(callback_url, {66 method: "POST",67 headers: {68 Authorization: `Bearer ${Deno.env.get("MODELRIVER_API_KEY")}`,69 "Content-Type": "application/json",70 },71 body: JSON.stringify({72 data: {73 ...aiData,74 id: record.id,75 saved_at: record.created_at,76 supabase_url: `${Deno.env.get("SUPABASE_URL")}/rest/v1/ai_content?id=eq.${record.id}`,77 },78 task_id: `supabase_${record.id}`,79 metadata: {80 database: "supabase",81 table: "ai_content",82 record_id: record.id,83 },84 }),85 });86 87 if (!callbackResponse.ok) {88 throw new Error(`回调失败: ${callbackResponse.status}`);89 }90 91 return new Response(JSON.stringify({ received: true }), { status: 200 });92 93 } catch (error) {94 console.error("错误:", error);95 96 // 发送错误回调97 await fetch(callback_url, {98 method: "POST",99 headers: {100 Authorization: `Bearer ${Deno.env.get("MODELRIVER_API_KEY")}`,101 "Content-Type": "application/json",102 },103 body: JSON.stringify({104 error: "processing_failed",105 message: error.message,106 }),107 });108 109 return new Response(JSON.stringify({ received: true }), { status: 200 });110 }111 }112 113 return new Response(JSON.stringify({ received: true }), { status: 200 });114});部署 Edge Function
Bash
supabase functions deploy modelriver-webhook设置环境变量 (Secrets)
Bash
supabase secrets set MODELRIVER_API_KEY=mr_live_YOUR_API_KEYsupabase secrets set MODELRIVER_WEBHOOK_SECRET=your_webhook_secret带有实时功能的前端
使用 Supabase Realtime 订阅实时监听新记录:
TYPESCRIPT
1// React 示例2import { createClient } from "@supabase/supabase-js";3import { useEffect, useState } from "react";4 5const supabase = createClient(6 process.env.NEXT_PUBLIC_SUPABASE_URL!,7 process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY!8);9 10export function AiContentFeed() {11 const [items, setItems] = useState<any[]>([]);12 13 useEffect(() => {14 // 订阅新的 AI 内容15 const channel = supabase16 .channel("ai_content_changes")17 .on(18 "postgres_changes",19 { event: "INSERT", schema: "public", table: "ai_content" },20 (payload) => {21 setItems((prev) => [payload.new, ...prev]);22 }23 )24 .subscribe();25 26 return () => {27 supabase.removeChannel(channel);28 };29 }, []);30 31 return (32 <div>33 {items.map((item) => (34 <div key={item.id} className="p-4 border rounded mb-2">35 <h3 className="font-bold">{item.title}</h3>36 <p>{item.body}</p>37 <span className="text-xs text-zinc-500">{item.created_at}</span>38 </div>39 ))}40 </div>41 );42}最佳实践
- 为 Webhooks 使用 Edge Functions:全球分布式、低延迟的 Webhook 处理。
- 在您的表上启用 Realtime:客户端无需轮询即可获得即时更新。
- 使用行级安全性 (RLS):根据用户身份限制对 AI 生成内容的访问。
- 在 Edge Functions 中使用 Service Role Key:绕过 RLS 进行服务端写入。
- 将元数据存储为 JSONB:灵活的架构,适应各种 AI 响应形式。
下一步
- PlanetScale 事件驱动指南:MySQL 备选方案
- Supabase 向量集成:用于嵌入 (embeddings) 的 pgvector
- 事件驱动 AI 概述:架构和流程