Documentation

Event-driven AI with PlanetScale

Receive AI webhooks and write structured data directly to PlanetScale's serverless MySQL: with branching for safe schema evolution.

Overview

PlanetScale provides a serverless MySQL-compatible database powered by Vitess. Its branching model lets you iterate on schemas safely, while the serverless driver works natively in edge environments: making it ideal for webhook-driven AI data pipelines.

What you'll build:

  • A webhook handler that writes AI data to PlanetScale
  • Database schema for AI-generated content
  • Serverless driver usage for edge/serverless environments
  • Callback to ModelRiver with enriched records

Database schema

SQL
1-- Create via PlanetScale console or CLI
2CREATE TABLE ai_content (
3 id BIGINT AUTO_INCREMENT PRIMARY KEY,
4 title VARCHAR(500) NOT NULL,
5 body TEXT,
6 category VARCHAR(100) DEFAULT 'general',
7 metadata JSON,
8 channel_id VARCHAR(36),
9 event_name VARCHAR(100),
10 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
11 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
12 INDEX idx_channel (channel_id),
13 INDEX idx_event (event_name)
14);

Webhook handler (Node.js)

TYPESCRIPT
1// Using @planetscale/database serverless driver
2import { connect } from "@planetscale/database";
3import crypto from "crypto";
4 
5const db = connect({
6 host: process.env.PLANETSCALE_HOST,
7 username: process.env.PLANETSCALE_USERNAME,
8 password: process.env.PLANETSCALE_PASSWORD,
9});
10 
11function verifySignature(payload: string, signature: string, secret: string): boolean {
12 const expected = crypto
13 .createHmac("sha256", secret)
14 .update(payload)
15 .digest("hex");
16 return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
17}
18 
19export async function handleWebhook(req: Request): Promise<Response> {
20 const signature = req.headers.get("mr-signature") ?? "";
21 const rawBody = await req.text();
22 
23 // 1. Verify signature
24 if (!verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {
25 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });
26 }
27 
28 const payload = JSON.parse(rawBody);
29 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;
30 
31 // 2. Handle event-driven workflow
32 if (type === "task.ai_generated" && callback_url) {
33 try {
34 const aiData = ai_response?.data ?? {};
35 
36 // 3. Write to PlanetScale
37 const result = await db.execute(
38 `INSERT INTO ai_content (title, body, category, metadata, channel_id, event_name)
39 VALUES (?, ?, ?, ?, ?, ?)`,
40 [
41 aiData.title ?? "Untitled",
42 aiData.description ?? aiData.body ?? "",
43 customer_data?.category ?? "general",
44 JSON.stringify({
45 ...customer_data,
46 ai_model: payload.meta?.model,
47 ai_provider: payload.meta?.provider,
48 }),
49 channel_id,
50 event,
51 ]
52 );
53 
54 const recordId = result.insertId;
55 
56 // 4. Call back to ModelRiver
57 await fetch(callback_url, {
58 method: "POST",
59 headers: {
60 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
61 "Content-Type": "application/json",
62 },
63 body: JSON.stringify({
64 data: {
65 ...aiData,
66 id: recordId,
67 saved_at: new Date().toISOString(),
68 },
69 task_id: `planetscale_${recordId}`,
70 metadata: {
71 database: "planetscale",
72 table: "ai_content",
73 record_id: recordId,
74 },
75 }),
76 });
77 
78 return new Response(JSON.stringify({ received: true }), { status: 200 });
79 
80 } catch (error: any) {
81 console.error("Error:", error);
82 
83 await fetch(callback_url, {
84 method: "POST",
85 headers: {
86 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
87 "Content-Type": "application/json",
88 },
89 body: JSON.stringify({
90 error: "processing_failed",
91 message: error.message,
92 }),
93 });
94 
95 return new Response(JSON.stringify({ received: true }), { status: 200 });
96 }
97 }
98 
99 return new Response(JSON.stringify({ received: true }), { status: 200 });
100}

Next.js API route example

TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts
2import { NextRequest, NextResponse } from "next/server";
3import { connect } from "@planetscale/database";
4 
5const db = connect({
6 url: process.env.DATABASE_URL, // PlanetScale connection string
7});
8 
9export async function POST(request: NextRequest) {
10 // ... signature verification (see above) ...
11 
12 const payload = await request.json();
13 
14 if (payload.type === "task.ai_generated" && payload.callback_url) {
15 const aiData = payload.ai_response?.data ?? {};
16 
17 const result = await db.execute(
18 "INSERT INTO ai_content (title, body, metadata) VALUES (?, ?, ?)",
19 [aiData.title, aiData.description, JSON.stringify(payload.customer_data)]
20 );
21 
22 await fetch(payload.callback_url, {
23 method: "POST",
24 headers: {
25 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
26 "Content-Type": "application/json",
27 },
28 body: JSON.stringify({
29 data: { ...aiData, id: result.insertId },
30 task_id: `ps_${result.insertId}`,
31 }),
32 });
33 
34 return NextResponse.json({ received: true });
35 }
36 
37 return NextResponse.json({ received: true });
38}

Schema branching

PlanetScale's branching model lets you evolve your AI content schema safely:

Bash
# Create a development branch
pscale branch create my-db ai-content-v2
 
# Add new columns
pscale shell my-db ai-content-v2
> ALTER TABLE ai_content ADD COLUMN embedding_id VARCHAR(100);
> ALTER TABLE ai_content ADD COLUMN quality_score FLOAT;
 
# Create deploy request
pscale deploy-request create my-db ai-content-v2
 
# Deploy after review
pscale deploy-request deploy my-db <id>

Best practices

  1. Use the serverless driver: @planetscale/database works in edge/serverless environments.
  2. Use parameterized queries: Always use ? placeholders to prevent SQL injection.
  3. Store metadata as JSON: Flexible schema for varying AI response shapes.
  4. Use database branching: Test schema changes with AI data before deploying to production.
  5. Index channel_id: Enable fast lookups for event tracking.

Next steps