Neon 的事件驱动 AI

接收 AI Webhooks 并将结构化数据写入 Neon 的无服务器 Postgres:利用分支功能、自动缩放和原生无服务器驱动程序。

概述

Neon 提供了具有分支功能、可自动缩放至零以及支持 WebSocket 的无服务器 Postgres。其无服务器驱动程序可以在 Edge 环境中运行。通过将其与事件驱动 AI 结合使用,您可以构建完全无服务器的数据管道,让 AI 生成的数据直接落入 Postgres 数据库。

您将构建的内容:

  • 一个使用 Neon 无服务器驱动程序的 Webhook 处理程序
  • 用于存储 AI 生成内容的数据库架构 (Schema)
  • 适用于生产负载的连接池 (Connection pooling) 配置
  • 携带增强后的记录回调 ModelRiver

数据库架构 (Schema)

SQL
1-- Neon psql
2CREATE TABLE ai_content (
3 id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
4 title TEXT NOT NULL,
5 body TEXT,
6 category TEXT DEFAULT 'general',
7 metadata JSONB DEFAULT '{}',
8 channel_id TEXT,
9 event_name TEXT,
10 created_at TIMESTAMPTZ DEFAULT now(),
11 updated_at TIMESTAMPTZ DEFAULT now()
12);
13 
14CREATE INDEX idx_ai_content_channel ON ai_content(channel_id);
15CREATE INDEX idx_ai_content_event ON ai_content(event_name);
16CREATE INDEX idx_ai_content_created ON ai_content(created_at DESC);

Webhook 处理程序 (Node.js)

TYPESCRIPT
1// 使用 @neondatabase/serverless 驱动程序
2import { neon } from "@neondatabase/serverless";
3import crypto from "crypto";
4 
5const sql = neon(process.env.DATABASE_URL!);
6 
7function verifySignature(payload: string, signature: string, secret: string): boolean {
8 const expected = crypto
9 .createHmac("sha256", secret)
10 .update(payload)
11 .digest("hex");
12 return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
13}
14 
15export async function handleWebhook(req: Request): Promise<Response> {
16 const signature = req.headers.get("mr-signature") ?? "";
17 const rawBody = await req.text();
18 
19 // 1. 验证签名
20 if (!signature || !verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {
21 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });
22 }
23 
24 const payload = JSON.parse(rawBody);
25 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;
26 
27 // 2. 处理事件驱动的工作流
28 if (type === "task.ai_generated" && callback_url) {
29 try {
30 const aiData = ai_response?.data ?? {};
31 
32 // 3. 写入 Neon
33 const [record] = await sql`
34 INSERT INTO ai_content (title, body, category, metadata, channel_id, event_name)
35 VALUES (
36 ${aiData.title ?? "Untitled"},
37 ${aiData.description ?? aiData.body ?? ""},
38 ${customer_data?.category ?? "general"},
39 ${JSON.stringify({
40 ...customer_data,
41 ai_model: payload.meta?.model,
42 ai_provider: payload.meta?.provider,
43 })}::jsonb,
44 ${channel_id},
45 ${event}
46 )
47 RETURNING id, created_at
48 `;
49 
50 // 4. 回调 ModelRiver
51 await fetch(callback_url, {
52 method: "POST",
53 headers: {
54 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
55 "Content-Type": "application/json",
56 },
57 body: JSON.stringify({
58 data: {
59 ...aiData,
60 id: record.id,
61 saved_at: record.created_at,
62 },
63 task_id: `neon_${record.id}`,
64 metadata: {
65 database: "neon",
66 table: "ai_content",
67 record_id: record.id,
68 },
69 }),
70 });
71 
72 return new Response(JSON.stringify({ received: true }), { status: 200 });
73 
74 } catch (error: any) {
75 console.error("错误:", error);
76 
77 await fetch(callback_url, {
78 method: "POST",
79 headers: {
80 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
81 "Content-Type": "application/json",
82 },
83 body: JSON.stringify({
84 error: "processing_failed",
85 message: error.message,
86 }),
87 });
88 
89 return new Response(JSON.stringify({ received: true }), { status: 200 });
90 }
91 }
92 
93 return new Response(JSON.stringify({ received: true }), { status: 200 });
94}

Vercel Edge Function 示例

Neon 的无服务器驱动程序可以在 Edge 环境中原生运行:

TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts
2import { NextRequest, NextResponse } from "next/server";
3import { neon } from "@neondatabase/serverless";
4 
5export const runtime = "edge";
6 
7const sql = neon(process.env.DATABASE_URL!);
8 
9export async function POST(request: NextRequest) {
10 // ... 签名验证 ...
11 
12 const payload = await request.json();
13 
14 if (payload.type === "task.ai_generated" && payload.callback_url) {
15 const aiData = payload.ai_response?.data ?? {};
16 
17 const [record] = await sql`
18 INSERT INTO ai_content (title, body, metadata)
19 VALUES (${aiData.title}, ${aiData.description}, ${JSON.stringify(payload.customer_data)}::jsonb)
20 RETURNING id, created_at
21 `;
22 
23 await fetch(payload.callback_url, {
24 method: "POST",
25 headers: {
26 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
27 "Content-Type": "application/json",
28 },
29 body: JSON.stringify({
30 data: { ...aiData, id: record.id, saved_at: record.created_at },
31 task_id: `neon_${record.id}`,
32 }),
33 });
34 
35 return NextResponse.json({ received: true });
36 }
37 
38 return NextResponse.json({ received: true });
39}

Drizzle ORM 集成

用于类型安全的数据库访问:

TYPESCRIPT
1// db/schema.ts
2import { pgTable, uuid, text, jsonb, timestamp } from "drizzle-orm/pg-core";
3 
4export const aiContent = pgTable("ai_content", {
5 id: uuid("id").defaultRandom().primaryKey(),
6 title: text("title").notNull(),
7 body: text("body"),
8 category: text("category").default("general"),
9 metadata: jsonb("metadata").default({}),
10 channelId: text("channel_id"),
11 eventName: text("event_name"),
12 createdAt: timestamp("created_at", { withTimezone: true }).defaultNow(),
13 updatedAt: timestamp("updated_at", { withTimezone: true }).defaultNow(),
14});
15 
16// 在您的 Webhook 处理程序中
17import { drizzle } from "drizzle-orm/neon-http";
18import { neon } from "@neondatabase/serverless";
19import { aiContent } from "./db/schema";
20 
21const sql = neon(process.env.DATABASE_URL!);
22const db = drizzle(sql);
23 
24const [record] = await db
25 .insert(aiContent)
26 .values({
27 title: aiData.title,
28 body: aiData.description,
29 category: customerData?.category,
30 metadata: customerData,
31 channelId,
32 eventName: event,
33 })
34 .returning();

数据库分支 (Branching)

在不影响生产环境的情况下测试 AI 内容架构:

Bash
# 创建一个分支
neonctl branches create --name ai-content-v2
 
# 获取分支的连接字符串
neonctl connection-string ai-content-v2
 
# 针对该分支测试您的 Webhook 处理程序
MODELRIVER_WEBHOOK_SECRET=... DATABASE_URL=<branch-url> npm run dev
 
# 满意后进行合并

最佳实践

  1. 使用无服务器驱动程序@neondatabase/serverless 使用 WebSocket 以实现 Edge 兼容性。
  2. 使用带标签的模板字面量:Neon 的 sql 函数会自动防止 SQL 注入。
  3. 将元数据用 JSONB 存储:灵活的架构,适应各种 AI 响应形式。
  4. 使用数据库分支:在投入生产之前安全地测试架构变更。
  5. 使用 Drizzle ORM:类型安全查询,并支持自动架构推断。

下一步