PlanetScale 的事件驱动 AI

接收 AI Webhooks 并将结构化数据直接写入 PlanetScale 的无服务器 MySQL:利用分支功能实现安全的架构演进。

概述

PlanetScale 提供了由 Vitess 驱动的无服务器 MySQL 兼容数据库。其分支模型 (Branching model) 让您可以安全地迭代数据库架构,而无服务器驱动程序则可以在 Edge 环境中原生运行:这使其成为 Webhook 驱动的 AI 数据管道的理想选择。

您将构建的内容:

  • 一个将 AI 数据写入 PlanetScale 的 Webhook 处理程序
  • 用于存储 AI 生成内容的数据库架构 (Schema)
  • 适用于 Edge/Serverless 环境的无服务器驱动程序用法
  • 携带增强后的记录回调 ModelRiver

数据库架构 (Schema)

SQL
1-- PlanetScale CLI
2CREATE TABLE ai_content (
3 id BIGINT AUTO_INCREMENT PRIMARY KEY,
4 title VARCHAR(500) NOT NULL,
5 body TEXT,
6 category VARCHAR(100) DEFAULT 'general',
7 metadata JSON,
8 channel_id VARCHAR(36),
9 event_name VARCHAR(100),
10 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
11 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
12 INDEX idx_channel (channel_id),
13 INDEX idx_event (event_name)
14);

Webhook 处理程序 (Node.js)

TYPESCRIPT
1// 使用 @planetscale/database 无服务器驱动程序
2import { connect } from "@planetscale/database";
3import crypto from "crypto";
4 
5const db = connect({
6 host: process.env.PLANETSCALE_HOST,
7 username: process.env.PLANETSCALE_USERNAME,
8 password: process.env.PLANETSCALE_PASSWORD,
9});
10 
11function verifySignature(payload: string, signature: string, secret: string): boolean {
12 const expected = crypto
13 .createHmac("sha256", secret)
14 .update(payload)
15 .digest("hex");
16 return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
17}
18 
19export async function handleWebhook(req: Request): Promise<Response> {
20 const signature = req.headers.get("mr-signature") ?? "";
21 const rawBody = await req.text();
22 
23 // 1. 验证签名
24 if (!signature || !verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {
25 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });
26 }
27 
28 const payload = JSON.parse(rawBody);
29 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;
30 
31 // 2. 处理事件驱动的工作流
32 if (type === "task.ai_generated" && callback_url) {
33 try {
34 const aiData = ai_response?.data ?? {};
35 
36 // 3. 写入 PlanetScale
37 const result = await db.execute(
38 `INSERT INTO ai_content (title, body, category, metadata, channel_id, event_name)
39 VALUES (?, ?, ?, ?, ?, ?)`,
40 [
41 aiData.title ?? "Untitled",
42 aiData.description ?? aiData.body ?? "",
43 customer_data?.category ?? "general",
44 JSON.stringify({
45 ...customer_data,
46 ai_model: payload.meta?.model,
47 ai_provider: payload.meta?.provider,
48 }),
49 channel_id,
50 event,
51 ]
52 );
53 
54 const recordId = result.insertId;
55 
56 // 4. 回调 ModelRiver
57 await fetch(callback_url, {
58 method: "POST",
59 headers: {
60 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
61 "Content-Type": "application/json",
62 },
63 body: JSON.stringify({
64 data: {
65 ...aiData,
66 id: recordId,
67 saved_at: new Date().toISOString(),
68 },
69 task_id: `planetscale_${recordId}`,
70 metadata: {
71 database: "planetscale",
72 table: "ai_content",
73 record_id: recordId,
74 },
75 }),
76 });
77 
78 return new Response(JSON.stringify({ received: true }), { status: 200 });
79 
80 } catch (error: any) {
81 console.error("错误:", error);
82 
83 await fetch(callback_url, {
84 method: "POST",
85 headers: {
86 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
87 "Content-Type": "application/json",
88 },
89 body: JSON.stringify({
90 error: "processing_failed",
91 message: error.message,
92 }),
93 });
94 
95 return new Response(JSON.stringify({ received: true }), { status: 200 });
96 }
97 }
98 
99 return new Response(JSON.stringify({ received: true }), { status: 200 });
100}

Next.js API 路由示例

TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts
2import { NextRequest, NextResponse } from "next/server";
3import { connect } from "@planetscale/database";
4 
5const db = connect({
6 url: process.env.DATABASE_URL, // PlanetScale 连接字符串
7});
8 
9export async function POST(request: NextRequest) {
10 // ... 签名验证(见上文) ...
11 
12 const payload = await request.json();
13 
14 if (payload.type === "task.ai_generated" && payload.callback_url) {
15 const aiData = payload.ai_response?.data ?? {};
16 
17 const result = await db.execute(
18 "INSERT INTO ai_content (title, body, metadata) VALUES (?, ?, ?)",
19 [aiData.title, aiData.description, JSON.stringify(payload.customer_data)]
20 );
21 
22 await fetch(payload.callback_url, {
23 method: "POST",
24 headers: {
25 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
26 "Content-Type": "application/json",
27 },
28 body: JSON.stringify({
29 data: { ...aiData, id: result.insertId },
30 task_id: `ps_${result.insertId}`,
31 }),
32 });
33 
34 return NextResponse.json({ received: true });
35 }
36 
37 return NextResponse.json({ received: true });
38}

架构分支 (Branching)

PlanetScale 的分支模型让您可以安全地演进 AI 内容架构:

Bash
# 创建开发分支
pscale branch create my-db ai-content-v2
 
# 添加新列
pscale shell my-db ai-content-v2
> ALTER TABLE ai_content ADD COLUMN embedding_id VARCHAR(100);
> ALTER TABLE ai_content ADD COLUMN quality_score FLOAT;
 
# 创建部署请求
pscale deploy-request create my-db ai-content-v2
 
# 审核后进行部署
pscale deploy-request deploy my-db <id>

最佳实践

  1. 使用无服务器驱动程序@planetscale/database 可以在 Edge/Serverless 环境中运行。
  2. 使用参数化查询:始终使用 ? 占位符以防止 SQL 注入。
  3. 将元数据存储为 JSON:灵活的架构,适应各种 AI 响应形式。
  4. 使用数据库分支:在部署到生产环境之前,使用 AI 数据测试架构变更。
  5. channel_id 上建立索引:实现快速的事件追踪查询。

下一步