Documentation

Event-driven AI with Convex

Receive AI webhooks in Convex HTTP actions, write data with mutations, and stream results to clients with real-time queries: zero polling required.

Overview

Convex is a reactive database platform where queries automatically update when data changes. Combine it with ModelRiver event-driven AI and you get an end-to-end reactive pipeline: AI generates data → webhook writes to Convex → your React components update instantly.

What you'll build:

  • A Convex HTTP action that receives ModelRiver webhooks
  • A mutation for writing AI-generated data
  • Real-time queries that update the UI automatically
  • Callback to ModelRiver with record details

Schema

TYPESCRIPT
1// convex/schema.ts
2import { defineSchema, defineTable } from "convex/server";
3import { v } from "convex/values";
4 
5export default defineSchema({
6 aiContent: defineTable({
7 title: v.string(),
8 body: v.optional(v.string()),
9 category: v.string(),
10 metadata: v.any(),
11 channelId: v.optional(v.string()),
12 eventName: v.optional(v.string()),
13 source: v.string(),
14 })
15 .index("by_channel", ["channelId"])
16 .index("by_event", ["eventName"])
17 .index("by_creation", ["_creationTime"]),
18});

Mutation

TYPESCRIPT
1// convex/aiContent.ts
2import { mutation, query } from "./_generated/server";
3import { v } from "convex/values";
4 
5export const insert = mutation({
6 args: {
7 title: v.string(),
8 body: v.optional(v.string()),
9 category: v.string(),
10 metadata: v.any(),
11 channelId: v.optional(v.string()),
12 eventName: v.optional(v.string()),
13 },
14 handler: async (ctx, args) => {
15 const id = await ctx.db.insert("aiContent", {
16 ...args,
17 source: "modelriver",
18 });
19 return id;
20 },
21});
22 
23export const list = query({
24 args: {
25 limit: v.optional(v.number()),
26 },
27 handler: async (ctx, args) => {
28 return await ctx.db
29 .query("aiContent")
30 .order("desc")
31 .take(args.limit ?? 20);
32 },
33});
34 
35export const getByChannel = query({
36 args: {
37 channelId: v.string(),
38 },
39 handler: async (ctx, args) => {
40 return await ctx.db
41 .query("aiContent")
42 .withIndex("by_channel", (q) => q.eq("channelId", args.channelId))
43 .first();
44 },
45});

HTTP action (webhook handler)

TYPESCRIPT
1// convex/http.ts
2import { httpRouter } from "convex/server";
3import { httpAction } from "./_generated/server";
4import { api } from "./_generated/api";
5 
6const http = httpRouter();
7 
8http.route({
9 path: "/webhooks/modelriver",
10 method: "POST",
11 handler: httpAction(async (ctx, request) => {
12 const signature = request.headers.get("mr-signature") ?? "";
13 const rawBody = await request.text();
14 
15 // 1. Verify signature
16 const webhookSecret = process.env.MODELRIVER_WEBHOOK_SECRET ?? "";
17 const encoder = new TextEncoder();
18 const key = await crypto.subtle.importKey(
19 "raw",
20 encoder.encode(webhookSecret),
21 { name: "HMAC", hash: "SHA-256" },
22 false,
23 ["sign"]
24 );
25 const sig = await crypto.subtle.sign("HMAC", key, encoder.encode(rawBody));
26 const expected = Array.from(new Uint8Array(sig))
27 .map((b) => b.toString(16).padStart(2, "0"))
28 .join("");
29 
30 if (expected !== signature) {
31 return new Response(JSON.stringify({ error: "Invalid signature" }), {
32 status: 401,
33 headers: { "Content-Type": "application/json" },
34 });
35 }
36 
37 const payload = JSON.parse(rawBody);
38 const { type, event, ai_response, callback_url, customer_data, channel_id } = payload;
39 
40 // 2. Handle event-driven workflow
41 if (type === "task.ai_generated" && callback_url) {
42 try {
43 const aiData = ai_response?.data ?? {};
44 
45 // 3. Write to Convex database via mutation
46 const recordId = await ctx.runMutation(api.aiContent.insert, {
47 title: aiData.title ?? "Untitled",
48 body: aiData.description ?? aiData.body ?? "",
49 category: customer_data?.category ?? "general",
50 metadata: {
51 ...customer_data,
52 ai_model: payload.meta?.model,
53 ai_provider: payload.meta?.provider,
54 },
55 channelId: channel_id,
56 eventName: event,
57 });
58 
59 // 4. Call back to ModelRiver
60 const callbackResponse = await fetch(callback_url, {
61 method: "POST",
62 headers: {
63 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
64 "Content-Type": "application/json",
65 },
66 body: JSON.stringify({
67 data: {
68 ...aiData,
69 id: recordId,
70 saved_at: new Date().toISOString(),
71 },
72 task_id: `convex_${recordId}`,
73 metadata: {
74 database: "convex",
75 table: "aiContent",
76 record_id: recordId,
77 },
78 }),
79 });
80 
81 if (!callbackResponse.ok) {
82 throw new Error(`Callback failed: ${callbackResponse.status}`);
83 }
84 
85 return new Response(JSON.stringify({ received: true }), {
86 status: 200,
87 headers: { "Content-Type": "application/json" },
88 });
89 
90 } catch (error: any) {
91 console.error("Error:", error);
92 
93 await fetch(callback_url, {
94 method: "POST",
95 headers: {
96 Authorization: `Bearer ${process.env.MODELRIVER_API_KEY}`,
97 "Content-Type": "application/json",
98 },
99 body: JSON.stringify({
100 error: "processing_failed",
101 message: error.message,
102 }),
103 });
104 
105 return new Response(JSON.stringify({ received: true }), {
106 status: 200,
107 headers: { "Content-Type": "application/json" },
108 });
109 }
110 }
111 
112 return new Response(JSON.stringify({ received: true }), {
113 status: 200,
114 headers: { "Content-Type": "application/json" },
115 });
116 }),
117});
118 
119export default http;

React frontend (real-time)

Convex queries are reactive: the UI updates automatically when the webhook writes new data:

TSX
1// src/App.tsx
2import { useQuery } from "convex/react";
3import { api } from "../convex/_generated/api";
4 
5export function AiContentFeed() {
6 // This query automatically updates when new records are inserted!
7 const items = useQuery(api.aiContent.list, { limit: 10 });
8 
9 if (items === undefined) return <p>Loading...</p>;
10 
11 return (
12 <div className="space-y-4">
13 {items.map((item) => (
14 <div key={item._id} className="p-4 border rounded-lg">
15 <h3 className="font-bold text-lg">{item.title}</h3>
16 <p className="text-gray-600">{item.body}</p>
17 <div className="flex gap-2 mt-2">
18 <span className="text-xs bg-blue-100 text-blue-800 px-2 py-1 rounded">
19 {item.category}
20 </span>
21 <span className="text-xs text-gray-400">
22 {new Date(item._creationTime).toLocaleString()}
23 </span>
24 </div>
25 </div>
26 ))}
27 </div>
28 );
29}

No WebSocket setup, no polling, no subscriptions: Convex handles reactivity automatically.


Best practices

  1. Use HTTP actions for webhooks: Convex HTTP actions can receive external HTTP requests.
  2. Use mutations for writes: All database writes go through validated mutations.
  3. Leverage reactive queries: No need for separate WebSocket/polling infrastructure.
  4. Use Web Crypto API: Convex HTTP actions run in a V8 environment with built-in Web Crypto.
  5. Store environment variables: Use Convex dashboard settings for API keys and secrets.

Next steps