概述
Neon 提供了具有分支功能、可自动缩放至零以及支持 WebSocket 的无服务器 Postgres。其无服务器驱动程序可以在 Edge 环境中运行。通过将其与事件驱动 AI 结合使用,您可以构建完全无服务器的数据管道,让 AI 生成的数据直接落入 Postgres 数据库。
您将构建的内容:
- 一个使用 Neon 无服务器驱动程序的 Webhook 处理程序
- 用于存储 AI 生成内容的数据库架构 (Schema)
- 适用于生产负载的连接池 (Connection pooling) 配置
- 携带增强后的记录回调 ModelRiver
数据库架构 (Schema)
SQL
1-- 通过 Neon 控制台或 psql 创建2CREATE TABLE ai_content (3 id UUID DEFAULT gen_random_uuid() PRIMARY KEY,4 title TEXT NOT NULL,5 body TEXT,6 category TEXT DEFAULT 'general',7 metadata JSONB DEFAULT '{}',8 channel_id TEXT,9 event_name TEXT,10 created_at TIMESTAMPTZ DEFAULT now(),11 updated_at TIMESTAMPTZ DEFAULT now()12);13 14CREATE INDEX idx_ai_content_channel ON ai_content(channel_id);15CREATE INDEX idx_ai_content_event ON ai_content(event_name);16CREATE INDEX idx_ai_content_created ON ai_content(created_at DESC);Webhook 处理程序 (Node.js)
TYPESCRIPT
1// 使用 @neondatabase/serverless 驱动程序2import { neon } from "@neondatabase/serverless";3import crypto from "crypto";4 5const sql = neon(process.env.DATABASE_URL!);6 7function verifySignature(payload: string, signature: string, secret: string): boolean {8 const expected = crypto9 .createHmac("sha256", secret)10 .update(payload)11 .digest("hex");12 return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));13}14 15export async function handleWebhook(req: Request): Promise<Response> {16 const signature = req.headers.get("mr-signature") ?? "";17 const rawBody = await req.text();18 19 // 1. 验证签名20 if (!signature || !verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {21 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });22 }23 24 const payload = JSON.parse(rawBody);25 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;26 27 // 2. 处理事件驱动的工作流28 if (type === "task.ai_generated" && callback_url) {29 try {30 const aiData = ai_response?.data ?? {};31 32 // 3. 写入 Neon33 const [record] = await sql`34 INSERT INTO ai_content (title, body, category, metadata, channel_id, event_name)35 VALUES (36 ${aiData.title ?? "Untitled"},37 ${aiData.description ?? aiData.body ?? ""},38 ${customer_data?.category ?? "general"},39 ${JSON.stringify({40 ...customer_data,41 ai_model: payload.meta?.model,42 ai_provider: payload.meta?.provider,43 })}::jsonb,44 ${channel_id},45 ${event}46 )47 RETURNING id, created_at48 `;49 50 // 4. 回调 ModelRiver51 await fetch(callback_url, {52 method: "POST",53 headers: {54 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,55 "Content-Type": "application/json",56 },57 body: JSON.stringify({58 data: {59 ...aiData,60 id: record.id,61 saved_at: record.created_at,62 },63 task_id: `neon_${record.id}`,64 metadata: {65 database: "neon",66 table: "ai_content",67 record_id: record.id,68 },69 }),70 });71 72 return new Response(JSON.stringify({ received: true }), { status: 200 });73 74 } catch (error: any) {75 console.error("错误:", error);76 77 await fetch(callback_url, {78 method: "POST",79 headers: {80 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,81 "Content-Type": "application/json",82 },83 body: JSON.stringify({84 error: "processing_failed",85 message: error.message,86 }),87 });88 89 return new Response(JSON.stringify({ received: true }), { status: 200 });90 }91 }92 93 return new Response(JSON.stringify({ received: true }), { status: 200 });94}Vercel Edge Function 示例
Neon 的无服务器驱动程序可以在 Edge 环境中原生运行:
TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts2import { NextRequest, NextResponse } from "next/server";3import { neon } from "@neondatabase/serverless";4 5export const runtime = "edge";6 7const sql = neon(process.env.DATABASE_URL!);8 9export async function POST(request: NextRequest) {10 // ... 签名验证 ...11 12 const payload = await request.json();13 14 if (payload.type === "task.ai_generated" && payload.callback_url) {15 const aiData = payload.ai_response?.data ?? {};16 17 const [record] = await sql`18 INSERT INTO ai_content (title, body, metadata)19 VALUES (${aiData.title}, ${aiData.description}, ${JSON.stringify(payload.customer_data)}::jsonb)20 RETURNING id, created_at21 `;22 23 await fetch(payload.callback_url, {24 method: "POST",25 headers: {26 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,27 "Content-Type": "application/json",28 },29 body: JSON.stringify({30 data: { ...aiData, id: record.id, saved_at: record.created_at },31 task_id: `neon_${record.id}`,32 }),33 });34 35 return NextResponse.json({ received: true });36 }37 38 return NextResponse.json({ received: true });39}Drizzle ORM 集成
用于类型安全的数据库访问:
TYPESCRIPT
1// db/schema.ts2import { pgTable, uuid, text, jsonb, timestamp } from "drizzle-orm/pg-core";3 4export const aiContent = pgTable("ai_content", {5 id: uuid("id").defaultRandom().primaryKey(),6 title: text("title").notNull(),7 body: text("body"),8 category: text("category").default("general"),9 metadata: jsonb("metadata").default({}),10 channelId: text("channel_id"),11 eventName: text("event_name"),12 createdAt: timestamp("created_at", { withTimezone: true }).defaultNow(),13 updatedAt: timestamp("updated_at", { withTimezone: true }).defaultNow(),14});15 16// 在您的 Webhook 处理程序中17import { drizzle } from "drizzle-orm/neon-http";18import { neon } from "@neondatabase/serverless";19import { aiContent } from "./db/schema";20 21const sql = neon(process.env.DATABASE_URL!);22const db = drizzle(sql);23 24const [record] = await db25 .insert(aiContent)26 .values({27 title: aiData.title,28 body: aiData.description,29 category: customerData?.category,30 metadata: customerData,31 channelId,32 eventName: event,33 })34 .returning();数据库分支 (Branching)
在不影响生产环境的情况下测试 AI 内容架构:
Bash
# 创建一个分支neonctl branches create --name ai-content-v2 # 获取分支的连接字符串neonctl connection-string ai-content-v2 # 针对该分支测试您的 Webhook 处理程序MODELRIVER_WEBHOOK_SECRET=... DATABASE_URL=<branch-url> npm run dev # 满意后进行合并最佳实践
- 使用无服务器驱动程序:
@neondatabase/serverless使用 WebSocket 以实现 Edge 兼容性。 - 使用带标签的模板字面量:Neon 的
sql函数会自动防止 SQL 注入。 - 将元数据用 JSONB 存储:灵活的架构,适应各种 AI 响应形式。
- 使用数据库分支:在投入生产之前安全地测试架构变更。
- 使用 Drizzle ORM:类型安全查询,并支持自动架构推断。
下一步
- Convex 事件驱动指南:响应式数据库备选方案
- 事件驱动 AI 概述:架构和流程
- Webhooks 参考:重试政策和投递监控