Documentation

Event-driven AI with Neon

Receive AI webhooks and write structured data to Neon's serverless Postgres: with branching, autoscaling, and a native serverless driver.

Overview

Neon provides serverless Postgres with branching, autoscaling to zero, and a WebSocket-based serverless driver that works in edge environments. Use it with event-driven AI to build fully serverless data pipelines where AI-generated data lands directly in Postgres.

What you'll build:

  • A webhook handler using Neon's serverless driver
  • Database schema for AI-generated content
  • Connection pooling for production workloads
  • Callback to ModelRiver with enriched records

Database schema

SQL
1-- Create via Neon console or psql
2CREATE TABLE ai_content (
3 id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
4 title TEXT NOT NULL,
5 body TEXT,
6 category TEXT DEFAULT 'general',
7 metadata JSONB DEFAULT '{}',
8 channel_id TEXT,
9 event_name TEXT,
10 created_at TIMESTAMPTZ DEFAULT now(),
11 updated_at TIMESTAMPTZ DEFAULT now()
12);
13 
14CREATE INDEX idx_ai_content_channel ON ai_content(channel_id);
15CREATE INDEX idx_ai_content_event ON ai_content(event_name);
16CREATE INDEX idx_ai_content_created ON ai_content(created_at DESC);

Webhook handler (Node.js)

TYPESCRIPT
1// Using @neondatabase/serverless driver
2import { neon } from "@neondatabase/serverless";
3import crypto from "crypto";
4 
5const sql = neon(process.env.DATABASE_URL!);
6 
7function verifySignature(payload: string, signature: string, secret: string): boolean {
8 const expected = crypto
9 .createHmac("sha256", secret)
10 .update(payload)
11 .digest("hex");
12 return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
13}
14 
15export async function handleWebhook(req: Request): Promise<Response> {
16 const signature = req.headers.get("mr-signature") ?? "";
17 const rawBody = await req.text();
18 
19 // 1. Verify signature
20 if (!verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {
21 return new Response(JSON.stringify({ error: "Invalid signature" }), { status: 401 });
22 }
23 
24 const payload = JSON.parse(rawBody);
25 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;
26 
27 // 2. Handle event-driven workflow
28 if (type === "task.ai_generated" && callback_url) {
29 try {
30 const aiData = ai_response?.data ?? {};
31 
32 // 3. Write to Neon
33 const [record] = await sql`
34 INSERT INTO ai_content (title, body, category, metadata, channel_id, event_name)
35 VALUES (
36 ${aiData.title ?? "Untitled"},
37 ${aiData.description ?? aiData.body ?? ""},
38 ${customer_data?.category ?? "general"},
39 ${JSON.stringify({
40 ...customer_data,
41 ai_model: payload.meta?.model,
42 ai_provider: payload.meta?.provider,
43 })}::jsonb,
44 ${channel_id},
45 ${event}
46 )
47 RETURNING id, created_at
48 `;
49 
50 // 4. Call back to ModelRiver
51 await fetch(callback_url, {
52 method: "POST",
53 headers: {
54 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
55 "Content-Type": "application/json",
56 },
57 body: JSON.stringify({
58 data: {
59 ...aiData,
60 id: record.id,
61 saved_at: record.created_at,
62 },
63 task_id: `neon_${record.id}`,
64 metadata: {
65 database: "neon",
66 table: "ai_content",
67 record_id: record.id,
68 },
69 }),
70 });
71 
72 return new Response(JSON.stringify({ received: true }), { status: 200 });
73 
74 } catch (error: any) {
75 console.error("Error:", error);
76 
77 await fetch(callback_url, {
78 method: "POST",
79 headers: {
80 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
81 "Content-Type": "application/json",
82 },
83 body: JSON.stringify({
84 error: "processing_failed",
85 message: error.message,
86 }),
87 });
88 
89 return new Response(JSON.stringify({ received: true }), { status: 200 });
90 }
91 }
92 
93 return new Response(JSON.stringify({ received: true }), { status: 200 });
94}

Vercel Edge Function example

Neon's serverless driver works natively in edge environments:

TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts
2import { NextRequest, NextResponse } from "next/server";
3import { neon } from "@neondatabase/serverless";
4 
5export const runtime = "edge";
6 
7const sql = neon(process.env.DATABASE_URL!);
8 
9export async function POST(request: NextRequest) {
10 // ... signature verification ...
11 
12 const payload = await request.json();
13 
14 if (payload.type === "task.ai_generated" && payload.callback_url) {
15 const aiData = payload.ai_response?.data ?? {};
16 
17 const [record] = await sql`
18 INSERT INTO ai_content (title, body, metadata)
19 VALUES (${aiData.title}, ${aiData.description}, ${JSON.stringify(payload.customer_data)}::jsonb)
20 RETURNING id, created_at
21 `;
22 
23 await fetch(payload.callback_url, {
24 method: "POST",
25 headers: {
26 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
27 "Content-Type": "application/json",
28 },
29 body: JSON.stringify({
30 data: { ...aiData, id: record.id, saved_at: record.created_at },
31 task_id: `neon_${record.id}`,
32 }),
33 });
34 
35 return NextResponse.json({ received: true });
36 }
37 
38 return NextResponse.json({ received: true });
39}

Drizzle ORM integration

For type-safe database access:

TYPESCRIPT
1// db/schema.ts
2import { pgTable, uuid, text, jsonb, timestamp } from "drizzle-orm/pg-core";
3 
4export const aiContent = pgTable("ai_content", {
5 id: uuid("id").defaultRandom().primaryKey(),
6 title: text("title").notNull(),
7 body: text("body"),
8 category: text("category").default("general"),
9 metadata: jsonb("metadata").default({}),
10 channelId: text("channel_id"),
11 eventName: text("event_name"),
12 createdAt: timestamp("created_at", { withTimezone: true }).defaultNow(),
13 updatedAt: timestamp("updated_at", { withTimezone: true }).defaultNow(),
14});
15 
16// In your webhook handler
17import { drizzle } from "drizzle-orm/neon-http";
18import { neon } from "@neondatabase/serverless";
19import { aiContent } from "./db/schema";
20 
21const sql = neon(process.env.DATABASE_URL!);
22const db = drizzle(sql);
23 
24const [record] = await db
25 .insert(aiContent)
26 .values({
27 title: aiData.title,
28 body: aiData.description,
29 category: customerData?.category,
30 metadata: customerData,
31 channelId,
32 eventName: event,
33 })
34 .returning();

Database branching

Test AI data schemas without affecting production:

Bash
# Create a branch
neonctl branches create --name ai-content-v2
 
# Get connection string for the branch
neonctl connection-string ai-content-v2
 
# Test your webhook handler against the branch
MODELRIVER_WEBHOOK_SECRET=... DATABASE_URL=<branch-url> npm run dev
 
# Merge when satisfied

Best practices

  1. Use the serverless driver: @neondatabase/serverless uses WebSocket for edge compatibility.
  2. Use tagged template literals: Neon's sql function prevents SQL injection automatically.
  3. Use JSONB for metadata: Flexible schema for varying AI responses.
  4. Use database branching: Test schema changes safely before production.
  5. Use Drizzle ORM: Type-safe queries with automatic schema inference.

Next steps