Rails 的事件驱动 AI

在 Rails 控制器中接收 AI Webhooks,利用 Active Job 进行处理,并回调 ModelRiver:地道的 Ruby Webhook 处理方案。

概述

Ruby on Rails 的“约定优于配置 (Convention-over-configuration)”方法使 Webhook 处理变得简洁且易于维护。使用控制器动作作为端点,使用 Active Job 进行后台处理,并使用 Rails Credentials 进行安全的密钥管理。

您将构建的内容:

  • 一个带有 HMAC 签名验证的 Webhook 控制器
  • 用于异步处理的 Active Job 后台任务
  • 使用 Net::HTTP 或 Faraday 发起 HTTP 回调
  • 用于 API 密钥管理的 Rails Credentials 集成

快速开始

安装依赖

Bash
rails new my-ai-app --api
cd my-ai-app

在您的 Gemfile 中添加:

RUBY
1gem "faraday"
Bash
bundle install

配置

Bash
# 使用 Rails Credentials 存储密钥
EDITOR="code --wait" rails credentials:edit
YAML
1# config/credentials.yml.enc
2modelriver:
3 api_key: mr_live_YOUR_API_KEY
4 webhook_secret: your_webhook_secret

Webhook 控制器

RUBY
1# app/controllers/webhooks/modelriver_controller.rb
2module Webhooks
3 class ModelriverController < ApplicationController
4 skip_before_action :verify_authenticity_token
5 before_action :verify_signature
6 
7 def create
8 payload = JSON.parse(request.body.read)
9 type = payload["type"]
10 callback_url = payload["callback_url"]
11 
12 # 处理事件驱动的工作流
13 if type == "task.ai_generated" && callback_url.present?
14 ProcessAiWebhookJob.perform_later(
15 event: payload["event"],
16 ai_response: payload["ai_response"],
17 callback_url: callback_url,
18 customer_data: payload["customer_data"] || {}
19 )
20 
21 render json: { received: true }, status: :ok
22 return
23 end
24 
25 # 标准 Webhook
26 Rails.logger.info("收到标准 Webhook: #{type}")
27 render json: { received: true }, status: :ok
28 end
29 
30 private
31 
32 def verify_signature
33 signature = request.headers["mr-signature"].to_s
34 raw_body = request.body.read
35 request.body.rewind
36 
37 secret = Rails.application.credentials.dig(:modelriver, :webhook_secret)
38 expected = OpenSSL::HMAC.hexdigest("SHA256", secret, raw_body)
39 
40 unless ActiveSupport::SecurityUtils.secure_compare(expected, signature)
41 render json: { error: "Invalid signature" }, status: :unauthorized
42 end
43 end
44 end
45end

路由

RUBY
1# config/routes.rb
2Rails.application.routes.draw do
3 namespace :webhooks do
4 post "modelriver", to: "modelriver#create"
5 end
6end

Active Job

RUBY
1# app/jobs/process_ai_webhook_job.rb
2class ProcessAiWebhookJob < ApplicationJob
3 queue_as :default
4 retry_on StandardError, wait: 10.seconds, attempts: 3
5 
6 def perform(event:, ai_response:, callback_url:, customer_data:)
7 enriched_data = (ai_response["data"] || {}).dup
8 
9 # 您的自定义业务逻辑
10 case event
11 when "content_ready"
12 content = Content.create!(
13 title: enriched_data["title"],
14 body: enriched_data["description"],
15 category: customer_data["category"] || "general",
16 source: "modelriver"
17 )
18 enriched_data["id"] = content.id
19 enriched_data["slug"] = content.slug
20 enriched_data["saved_at"] = Time.current.iso8601
21 
22 when "entities_extracted"
23 entities = enriched_data["entities"] || []
24 entities.each do |entity|
25 Entity.create!(
26 name: entity["name"],
27 entity_type: entity["type"],
28 source_id: customer_data["document_id"]
29 )
30 end
31 enriched_data["entities_saved"] = entities.length
32 end
33 
34 # 回调 ModelRiver
35 api_key = Rails.application.credentials.dig(:modelriver, :api_key)
36 
37 conn = Faraday.new do |f|
38 f.request :json
39 f.response :raise_error
40 f.options.timeout = 10
41 end
42 
43 conn.post(callback_url) do |req|
44 req.headers["Authorization"] = "Bearer #{api_key}"
45 req.headers["Content-Type"] = "application/json"
46 req.body = {
47 data: enriched_data,
48 task_id: "rails_#{event}_#{Time.current.to_i}",
49 metadata: {
50 processed_by: "rails",
51 processed_at: Time.current.iso8601
52 }
53 }.to_json
54 end
55 
56 Rails.logger.info("✅ 已针对事件发送回调: #{event}")
57 
58 rescue Faraday::Error => e
59 Rails.logger.error("❌ 回调失败: #{e.message}")
60 
61 # 发送错误回调
62 Faraday.post(callback_url) do |req|
63 req.headers["Authorization"] = "Bearer #{api_key}"
64 req.headers["Content-Type"] = "application/json"
65 req.body = {
66 error: "processing_failed",
67 message: e.message
68 }.to_json
69 end
70 
71 raise
72 end
73end

触发异步请求

RUBY
1# app/services/modelriver_service.rb
2class ModelriverService
3 BASE_URL = "https://api.modelriver.com"
4 
5 def initialize
6 @api_key = Rails.application.credentials.dig(:modelriver, :api_key)
7 @conn = Faraday.new(url: BASE_URL) do |f|
8 f.request :json
9 f.response :json
10 f.response :raise_error
11 f.options.timeout = 10
12 end
13 end
14 
15 def trigger_async(workflow:, prompt:, metadata: {})
16 response = @conn.post("/v1/ai/async") do |req|
17 req.headers["Authorization"] = "Bearer #{@api_key}"
18 req.body = {
19 workflow: workflow,
20 messages: [{ role: "user", content: prompt }],
21 metadata: metadata
22 }
23 end
24 
25 response.body
26 end
27end
28 
29# 在控制器中使用
30class AiController < ApplicationController
31 def generate
32 service = ModelriverService.new
33 result = service.trigger_async(
34 workflow: "content_generator",
35 prompt: params[:prompt],
36 metadata: { user_id: current_user.id }
37 )
38 
39 render json: {
40 channel_id: result["channel_id"],
41 ws_token: result["ws_token"],
42 websocket_channel: result["websocket_channel"]
43 }
44 end
45end

最佳实践

  1. 使用 Active Job:绝不要阻塞 Webhook 响应;将其分发到后台队列。
  2. 使用 Rails Credentials:安全地存储 API 密钥和 Webhook 密钥。
  3. 使用 secure_compare:防止在签名验证时遭受计时攻击。
  4. 重置请求 Body 流 (Rewind):为进行签名验证而在读取原始 Body 后,需调用 request.body.rewind
  5. 使用 Faraday:统一的 HTTP 客户端,支持重试和超时。

下一步