Next.js 的事件驱动 AI

在 Next.js API 路由中接收 AI 生成的 Webhooks,通过自定义逻辑进行处理,并回调 ModelRiver:完整的 App Router 实现方案。

概述

Next.js 的 API 路由(App Router)是处理 ModelRiver 事件驱动 AI Webhooks 的理想方案。您可以在接收结构化 AI 输出、运行服务端逻辑以及发起回调的整个过程中,都复用服务于前端的同一个 Next.js 应用。

您将构建的内容:

  • 一个用于接收 ModelRiver 发出的 AI 生成数据的 Webhook 端点
  • 用于安全处理 Webhook 的签名验证中间件
  • 具有回调 ModelRiver 功能的后台处理逻辑
  • 一个通过 WebSocket 接收最终结果的 React 前端

快速开始

安装依赖

Bash
npx create-next-app@latest my-ai-app --typescript --app
cd my-ai-app
npm install crypto

环境变量

Bash
# .env.local
MODELRIVER_API_KEY=mr_live_YOUR_API_KEY
MODELRIVER_WEBHOOK_SECRET=your_webhook_secret

Webhook 处理程序

API 路由

TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts
2import { NextRequest, NextResponse } from "next/server";
3import crypto from "crypto";
4 
5function verifySignature(
6 payload: string,
7 signature: string,
8 secret: string
9): boolean {
10 const expected = crypto
11 .createHmac("sha256", secret)
12 .update(payload)
13 .digest("hex");
14 
15 return crypto.timingSafeEqual(
16 Buffer.from(signature),
17 Buffer.from(expected)
18 );
19}
20 
21export async function POST(request: NextRequest) {
22 const signature = request.headers.get("mr-signature");
23 const rawBody = await request.text();
24 
25 // 1. 验证 Webhook 签名
26 if (!signature || !verifySignature(rawBody, signature, process.env.MODELRIVER_WEBHOOK_SECRET!)) {
27 return NextResponse.json({ error: "Invalid signature" }, { status: 401 });
28 }
29 
30 const payload = JSON.parse(rawBody);
31 const { type, event, ai_response, callback_url, customer_data } = payload;
32 
33 // 2. 检查这是否是一个事件驱动的工作流
34 if (type === "task.ai_generated" && callback_url) {
35 // 立即响应:在后台处理
36 // 注意:Next.js 在 Serverless 环境中不直接支持真正的后台任务,
37 // 因此我们使用 waitUntil 或在响应前进行处理
38 await processAndCallback(event, ai_response, callback_url, customer_data);
39 
40 return NextResponse.json({ received: true });
41 }
42 
43 // 标准 Webhook(无事件名称)
44 console.log("标准 Webhook:", payload);
45 return NextResponse.json({ received: true });
46}
47 
48async function processAndCallback(
49 event: string,
50 aiResponse: { data: Record<string, unknown> },
51 callbackUrl: string,
52 customerData: Record<string, unknown>
53) {
54 try {
55 let enrichedData = { ...aiResponse.data };
56 
57 // 3. 基于事件的自定义业务逻辑
58 if (event === "content_ready") {
59 // 保存到数据库
60 const savedRecord = await saveToDatabase(aiResponse.data);
61 enrichedData = {
62 ...enrichedData,
63 id: savedRecord.id,
64 slug: generateSlug(aiResponse.data.title as string),
65 saved_at: new Date().toISOString(),
66 };
67 }
68 
69 if (event === "review_complete") {
70 // 推送到外部 API
71 await postToExternalService(aiResponse.data);
72 enrichedData.posted = true;
73 }
74 
75 // 4. 将增强后的数据回调给 ModelRiver
76 const response = await fetch(callbackUrl, {
77 method: "POST",
78 headers: {
79 "Authorization": `Bearer ${process.env.MODELRIVER_API_KEY}`,
80 "Content-Type": "application/json",
81 },
82 body: JSON.stringify({
83 data: enrichedData,
84 task_id: `task_${Date.now()}`,
85 metadata: {
86 processed_at: new Date().toISOString(),
87 event,
88 },
89 }),
90 });
91 
92 if (!response.ok) {
93 throw new Error(`回调失败: ${response.status}`);
94 }
95 
96 console.log(`✅ 已针对事件发送回调: ${event}`);
97 } catch (error) {
98 console.error("处理 Webhook 时出错:", error);
99 
100 // 发送错误回调
101 await fetch(callbackUrl, {
102 method: "POST",
103 headers: {
104 "Authorization": `Bearer ${process.env.MODELRIVER_API_KEY}`,
105 "Content-Type": "application/json",
106 },
107 body: JSON.stringify({
108 error: "processing_failed",
109 message: error instanceof Error ? error.message : "未知错误",
110 }),
111 });
112 }
113}
114 
115// 您的业务逻辑函数
116async function saveToDatabase(data: Record<string, unknown>) {
117 // 替换为您实际的数据库逻辑(Prisma、Drizzle 等)
118 return { id: `rec_${Date.now()}`, ...data };
119}
120 
121function generateSlug(title: string): string {
122 return title.toLowerCase().replace(/[^a-z0-9]+/g, "-").replace(/(^-|-$)/g, "");
123}
124 
125async function postToExternalService(data: Record<string, unknown>) {
126 // 替换为您实际的 API 调用
127 console.log("正在推送到外部服务:", data);
128}

触发异步请求

Server action

TYPESCRIPT
1// app/actions/ai.ts
2"use server";
3 
4export async function triggerContentGeneration(prompt: string) {
5 const response = await fetch("https://api.modelriver.com/v1/ai/async", {
6 method: "POST",
7 headers: {
8 "Authorization": `Bearer ${process.env.MODELRIVER_API_KEY}`,
9 "Content-Type": "application/json",
10 },
11 body: JSON.stringify({
12 workflow: "content_generator",
13 messages: [{ role: "user", content: prompt }],
14 }),
15 });
16 
17 const data = await response.json();
18 return {
19 channelId: data.channel_id,
20 wsToken: data.ws_token,
21 websocketUrl: data.websocket_url,
22 websocketChannel: data.websocket_channel,
23 };
24}

带有 WebSocket 的 React 组件

TSX
1// app/components/AiContentGenerator.tsx
2"use client";
3 
4import { useState, useEffect } from "react";
5import { triggerContentGeneration } from "@/app/actions/ai";
6 
7export function AiContentGenerator() {
8 const [result, setResult] = useState<Record<string, unknown> | null>(null);
9 const [status, setStatus] = useState<"idle" | "processing" | "complete">("idle");
10 
11 async function handleGenerate(prompt: string) {
12 setStatus("processing");
13 
14 const { channelId, wsToken, websocketChannel } = await triggerContentGeneration(prompt);
15 
16 // 连接 WebSocket 以获取实时结果
17 const ws = new WebSocket(
18 `wss://api.modelriver.com/socket/websocket?token=${encodeURIComponent(wsToken)}`
19 );
20 
21 ws.onopen = () => {
22 ws.send(JSON.stringify({
23 topic: websocketChannel,
24 event: "phx_join",
25 payload: {},
26 ref: "1",
27 }));
28 };
29 
30 ws.onmessage = (event) => {
31 const msg = JSON.parse(event.data);
32 
33 if (msg.event === "response") {
34 setResult(msg.payload.data);
35 setStatus("complete");
36 ws.close();
37 }
38 };
39 }
40 
41 return (
42 <div>
43 <button onClick={() => handleGenerate("生成产品描述")}>
44 {status === "processing" ? "正在处理..." : "生成"}
45 </button>
46 {result && <pre>{JSON.stringify(result, null, 2)}</pre>}
47 </div>
48 );
49}

Edge 运行时

对于 Vercel Edge Functions,Webhook 处理程序的工作方式相同:只需添加 edge runtime 的导出即可:

TYPESCRIPT
1// app/api/webhooks/modelriver/route.ts
2export const runtime = "edge";
3 
4// ... 与上面相同的处理程序代码,使用 Web Crypto API 替代 Node crypto 进行签名验证

注意: 在 Edge 上进行 HMAC 验证时,请使用 crypto.subtle.importKeycrypto.subtle.sign 替代 Node.js 的 crypto 模块。


最佳实践

  1. 快速响应:在处理繁重逻辑之前,先返回 200。在 Serverless 环境中,请使用 Vercel 的 waitUntil 执行后台工作。
  2. 验证每个 Webhook:始终校验 mr-signature 请求头。
  3. 使用 Server Actions 进行触发:将 API 密钥保留在服务端。
  4. 处理超时:您的回调必须在 5 分钟内到达 ModelRiver。
  5. 记录处理步骤:使用结构化日志记录,以便调试事件驱动的工作流。

下一步