Supabase 的事件驱动 AI

在 Supabase Edge Functions 中接收 AI Webhooks,将结构化数据写入 Postgres,并通过 Supabase Realtime 将结果流式推送给客户端。

概述

Supabase 在一个平台中结合了 Postgres、实时订阅和 Edge Functions。这使其成为事件驱动 AI 的理想选择:您的 Edge Function 接收 Webhook,直接将 AI 数据写入 Postgres,然后 Supabase Realtime 将变更推送到已连接的客户端。

您将构建的内容:

  • 一个用于接收 ModelRiver Webhooks 的 Supabase Edge Function
  • 用于存储 AI 生成内容的数据库架构 (Schema)
  • 用于即时前端更新的实时订阅
  • 携带增强后的数据库记录回调 ModelRiver

数据库架构 (Schema)

SQL
1-- supabase/migrations/001_ai_content.sql
2create table ai_content (
3 id uuid default gen_random_uuid() primary key,
4 title text not null,
5 body text,
6 category text default 'general',
7 metadata jsonb default '{}',
8 channel_id text,
9 event_name text,
10 source text default 'modelriver',
11 created_at timestamptz default now(),
12 updated_at timestamptz default now()
13);
14 
15--
16alter publication supabase_realtime add table ai_content;
17 
18-- (RLS)
19alter table ai_content enable row level security;
20 
21create policy "用户可以查看自己的内容"
22 on ai_content for select
23 using (metadata->>'user_id' = auth.uid()::text);

Edge Function Webhook 处理程序

TYPESCRIPT
1// supabase/functions/modelriver-webhook/index.ts
2import { serve } from "https://deno.land/[email protected]/http/server.ts";
3import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
4 
5const supabase = createClient(
6 Deno.env.get("SUPABASE_URL")!,
7 Deno.env.get("SUPABASE_SERVICE_ROLE_KEY")!
8);
9 
10serve(async (req) => {
11 if (req.method !== "POST") {
12 return new Response("Method not allowed", { status: 405 });
13 }
14 
15 const signature = req.headers.get("mr-signature") ?? "";
16 const rawBody = await req.text();
17 const webhookSecret = Deno.env.get("MODELRIVER_WEBHOOK_SECRET") ?? "";
18 
19 // 1. 验证签名(在生产环境中使用完整的 HMAC-SHA256 验证)
20 const key = await crypto.subtle.importKey(
21 "raw",
22 new TextEncoder().encode(webhookSecret),
23 { name: "HMAC", hash: "SHA-256" },
24 false,
25 ["sign"]
26 );
27 const sig = await crypto.subtle.sign("HMAC", key, new TextEncoder().encode(rawBody));
28 const expected = Array.from(new Uint8Array(sig))
29 .map((b) => b.toString(16).padStart(2, "0"))
30 .join("");
31 
32 if (expected !== signature) {
33 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });
34 }
35 
36 const payload = JSON.parse(rawBody);
37 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;
38 
39 // 2. 处理事件驱动的工作流
40 if (type === "task.ai_generated" && callback_url) {
41 try {
42 const aiData = ai_response?.data ?? {};
43 
44 // 3. 写入 Supabase
45 const { data: record, error } = await supabase
46 .from("ai_content")
47 .insert({
48 title: aiData.title ?? "Untitled",
49 body: aiData.description ?? aiData.body ?? "",
50 category: customer_data?.category ?? "general",
51 metadata: {
52 ...customer_data,
53 ai_model: payload.meta?.model,
54 ai_provider: payload.meta?.provider,
55 },
56 channel_id,
57 event_name: event,
58 })
59 .select()
60 .single();
61 
62 if (error) throw error;
63 
64 // 4. 携带数据库记录回调 ModelRiver
65 const callbackResponse = await fetch(callback_url, {
66 method: "POST",
67 headers: {
68 Authorization: `Bearer ${Deno.env.get("MODELRIVER_API_KEY")}`,
69 "Content-Type": "application/json",
70 },
71 body: JSON.stringify({
72 data: {
73 ...aiData,
74 id: record.id,
75 saved_at: record.created_at,
76 supabase_url: `${Deno.env.get("SUPABASE_URL")}/rest/v1/ai_content?id=eq.${record.id}`,
77 },
78 task_id: `supabase_${record.id}`,
79 metadata: {
80 database: "supabase",
81 table: "ai_content",
82 record_id: record.id,
83 },
84 }),
85 });
86 
87 if (!callbackResponse.ok) {
88 throw new Error(`回调失败: ${callbackResponse.status}`);
89 }
90 
91 return new Response(JSON.stringify({ received: true }), { status: 200 });
92 
93 } catch (error) {
94 console.error("错误:", error);
95 
96 // 发送错误回调
97 await fetch(callback_url, {
98 method: "POST",
99 headers: {
100 Authorization: `Bearer ${Deno.env.get("MODELRIVER_API_KEY")}`,
101 "Content-Type": "application/json",
102 },
103 body: JSON.stringify({
104 error: "processing_failed",
105 message: error.message,
106 }),
107 });
108 
109 return new Response(JSON.stringify({ received: true }), { status: 200 });
110 }
111 }
112 
113 return new Response(JSON.stringify({ received: true }), { status: 200 });
114});

部署 Edge Function

Bash
supabase functions deploy modelriver-webhook

设置环境变量 (Secrets)

Bash
supabase secrets set MODELRIVER_API_KEY=mr_live_YOUR_API_KEY
supabase secrets set MODELRIVER_WEBHOOK_SECRET=your_webhook_secret

带有实时功能的前端

使用 Supabase Realtime 订阅实时监听新记录:

TYPESCRIPT
1// React 示例
2import { createClient } from "@supabase/supabase-js";
3import { useEffect, useState } from "react";
4 
5const supabase = createClient(
6 process.env.NEXT_PUBLIC_SUPABASE_URL!,
7 process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY!
8);
9 
10export function AiContentFeed() {
11 const [items, setItems] = useState<any[]>([]);
12 
13 useEffect(() => {
14 // 订阅新的 AI 内容
15 const channel = supabase
16 .channel("ai_content_changes")
17 .on(
18 "postgres_changes",
19 { event: "INSERT", schema: "public", table: "ai_content" },
20 (payload) => {
21 setItems((prev) => [payload.new, ...prev]);
22 }
23 )
24 .subscribe();
25 
26 return () => {
27 supabase.removeChannel(channel);
28 };
29 }, []);
30 
31 return (
32 <div>
33 {items.map((item) => (
34 <div key={item.id} className="p-4 border rounded mb-2">
35 <h3 className="font-bold">{item.title}</h3>
36 <p>{item.body}</p>
37 <span className="text-xs text-zinc-500">{item.created_at}</span>
38 </div>
39 ))}
40 </div>
41 );
42}

最佳实践

  1. 为 Webhooks 使用 Edge Functions:全球分布式、低延迟的 Webhook 处理。
  2. 在您的表上启用 Realtime:客户端无需轮询即可获得即时更新。
  3. 使用行级安全性 (RLS):根据用户身份限制对 AI 生成内容的访问。
  4. 在 Edge Functions 中使用 Service Role Key:绕过 RLS 进行服务端写入。
  5. 将元数据存储为 JSONB:灵活的架构,适应各种 AI 响应形式。

下一步