概述
PlanetScale 提供了由 Vitess 驱动的无服务器 MySQL 兼容数据库。其分支模型 (Branching model) 让您可以安全地迭代数据库架构,而无服务器驱动程序则可以在 Edge 环境中原生运行:这使其成为 Webhook 驱动的 AI 数据管道的理想选择。
您将构建的内容:
- 一个将 AI 数据写入 PlanetScale 的 Webhook 处理程序
- 用于存储 AI 生成内容的数据库架构 (Schema)
- 适用于 Edge/Serverless 环境的无服务器驱动程序用法
- 携带增强后的记录回调 ModelRiver
数据库架构 (Schema)
SQL
1-- 通过 PlanetScale 控制台或 CLI 创建2CREATE TABLE ai_content (3 id BIGINT AUTO_INCREMENT PRIMARY KEY,4 title VARCHAR(500) NOT NULL,5 body TEXT,6 category VARCHAR(100) DEFAULT 'general',7 metadata JSON,8 channel_id VARCHAR(36),9 event_name VARCHAR(100),10 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,11 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,12 INDEX idx_channel (channel_id),13 INDEX idx_event (event_name)14);Webhook 处理程序 (Node.js)
TYPESCRIPT
1// 使用 @planetscale/database 无服务器驱动程序2import { connect } from "@planetscale/database";3import crypto from "crypto";4 5const db = connect({6 host: process.env.PLANETSCALE_HOST,7 username: process.env.PLANETSCALE_USERNAME,8 password: process.env.PLANETSCALE_PASSWORD,9});10 11function verifySignature(payload: string, signature: string, secret: string): boolean {12 const expected = crypto13 .createHmac("sha256", secret)14 .update(payload)15 .digest("hex");16 return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));17}18 19export async function handleWebhook(req: Request): Promise<Response> {20 const signature = req.headers.get("mr-signature") ?? "";21 const rawBody = await req.text();22 23 // 1. 验证签名24 if (!signature || !verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {25 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });26 }27 28 const payload = JSON.parse(rawBody);29 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;30 31 // 2. 处理事件驱动的工作流32 if (type === "task.ai_generated" && callback_url) {33 try {34 const aiData = ai_response?.data ?? {};35 36 // 3. 写入 PlanetScale37 const result = await db.execute(38 `INSERT INTO ai_content (title, body, category, metadata, channel_id, event_name)39 VALUES (?, ?, ?, ?, ?, ?)`,40 [41 aiData.title ?? "Untitled",42 aiData.description ?? aiData.body ?? "",43 customer_data?.category ?? "general",44 JSON.stringify({45 ...customer_data,46 ai_model: payload.meta?.model,47 ai_provider: payload.meta?.provider,48 }),49 channel_id,50 event,51 ]52 );53 54 const recordId = result.insertId;55 56 // 4. 回调 ModelRiver57 await fetch(callback_url, {58 method: "POST",59 headers: {60 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,61 "Content-Type": "application/json",62 },63 body: JSON.stringify({64 data: {65 ...aiData,66 id: recordId,67 saved_at: new Date().toISOString(),68 },69 task_id: `planetscale_${recordId}`,70 metadata: {71 database: "planetscale",72 table: "ai_content",73 record_id: recordId,74 },75 }),76 });77 78 return new Response(JSON.stringify({ received: true }), { status: 200 });79 80 } catch (error: any) {81 console.error("错误:", error);82 83 await fetch(callback_url, {84 method: "POST",85 headers: {86 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,87 "Content-Type": "application/json",88 },89 body: JSON.stringify({90 error: "processing_failed",91 message: error.message,92 }),93 });94 95 return new Response(JSON.stringify({ received: true }), { status: 200 });96 }97 }98 99 return new Response(JSON.stringify({ received: true }), { status: 200 });100}Next.js API 路由示例
TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts2import { NextRequest, NextResponse } from "next/server";3import { connect } from "@planetscale/database";4 5const db = connect({6 url: process.env.DATABASE_URL, // PlanetScale 连接字符串7});8 9export async function POST(request: NextRequest) {10 // ... 签名验证(见上文) ...11 12 const payload = await request.json();13 14 if (payload.type === "task.ai_generated" && payload.callback_url) {15 const aiData = payload.ai_response?.data ?? {};16 17 const result = await db.execute(18 "INSERT INTO ai_content (title, body, metadata) VALUES (?, ?, ?)",19 [aiData.title, aiData.description, JSON.stringify(payload.customer_data)]20 );21 22 await fetch(payload.callback_url, {23 method: "POST",24 headers: {25 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,26 "Content-Type": "application/json",27 },28 body: JSON.stringify({29 data: { ...aiData, id: result.insertId },30 task_id: `ps_${result.insertId}`,31 }),32 });33 34 return NextResponse.json({ received: true });35 }36 37 return NextResponse.json({ received: true });38}架构分支 (Branching)
PlanetScale 的分支模型让您可以安全地演进 AI 内容架构:
Bash
# 创建开发分支pscale branch create my-db ai-content-v2 # 添加新列pscale shell my-db ai-content-v2> ALTER TABLE ai_content ADD COLUMN embedding_id VARCHAR(100);> ALTER TABLE ai_content ADD COLUMN quality_score FLOAT; # 创建部署请求pscale deploy-request create my-db ai-content-v2 # 审核后进行部署pscale deploy-request deploy my-db <id>最佳实践
- 使用无服务器驱动程序:
@planetscale/database可以在 Edge/Serverless 环境中运行。 - 使用参数化查询:始终使用
?占位符以防止 SQL 注入。 - 将元数据存储为 JSON:灵活的架构,适应各种 AI 响应形式。
- 使用数据库分支:在部署到生产环境之前,使用 AI 数据测试架构变更。
- 在
channel_id上建立索引:实现快速的事件追踪查询。
下一步
- Neon 事件驱动指南:Postgres 备选方案
- 事件驱动 AI 概述:架构和流程
- Webhooks 参考:重试政策和投递监控