概述
Ruby on Rails 的“约定优于配置 (Convention-over-configuration)”方法使 Webhook 处理变得简洁且易于维护。使用控制器动作作为端点,使用 Active Job 进行后台处理,并使用 Rails Credentials 进行安全的密钥管理。
您将构建的内容:
- 一个带有 HMAC 签名验证的 Webhook 控制器
- 用于异步处理的 Active Job 后台任务
- 使用
Net::HTTP或 Faraday 发起 HTTP 回调 - 用于 API 密钥管理的 Rails Credentials 集成
快速开始
安装依赖
Bash
rails new my-ai-app --apicd my-ai-app在您的 Gemfile 中添加:
RUBY
1gem "faraday"Bash
bundle install配置
Bash
# 使用 Rails Credentials 存储密钥EDITOR="code --wait" rails credentials:editYAML
1# config/credentials.yml.enc2modelriver:3 api_key: mr_live_YOUR_API_KEY4 webhook_secret: your_webhook_secretWebhook 控制器
RUBY
1# app/controllers/webhooks/modelriver_controller.rb2module Webhooks3 class ModelriverController < ApplicationController4 skip_before_action :verify_authenticity_token5 before_action :verify_signature6 7 def create8 payload = JSON.parse(request.body.read)9 type = payload["type"]10 callback_url = payload["callback_url"]11 12 # 处理事件驱动的工作流13 if type == "task.ai_generated" && callback_url.present?14 ProcessAiWebhookJob.perform_later(15 event: payload["event"],16 ai_response: payload["ai_response"],17 callback_url: callback_url,18 customer_data: payload["customer_data"] || {}19 )20 21 render json: { received: true }, status: :ok22 return23 end24 25 # 标准 Webhook26 Rails.logger.info("收到标准 Webhook: #{type}")27 render json: { received: true }, status: :ok28 end29 30 private31 32 def verify_signature33 signature = request.headers["mr-signature"].to_s34 raw_body = request.body.read35 request.body.rewind36 37 secret = Rails.application.credentials.dig(:modelriver, :webhook_secret)38 expected = OpenSSL::HMAC.hexdigest("SHA256", secret, raw_body)39 40 unless ActiveSupport::SecurityUtils.secure_compare(expected, signature)41 render json: { error: "Invalid signature" }, status: :unauthorized42 end43 end44 end45end路由
RUBY
1# config/routes.rb2Rails.application.routes.draw do3 namespace :webhooks do4 post "modelriver", to: "modelriver#create"5 end6endActive Job
RUBY
1# app/jobs/process_ai_webhook_job.rb2class ProcessAiWebhookJob < ApplicationJob3 queue_as :default4 retry_on StandardError, wait: 10.seconds, attempts: 35 6 def perform(event:, ai_response:, callback_url:, customer_data:)7 enriched_data = (ai_response["data"] || {}).dup8 9 # 您的自定义业务逻辑10 case event11 when "content_ready"12 content = Content.create!(13 title: enriched_data["title"],14 body: enriched_data["description"],15 category: customer_data["category"] || "general",16 source: "modelriver"17 )18 enriched_data["id"] = content.id19 enriched_data["slug"] = content.slug20 enriched_data["saved_at"] = Time.current.iso860121 22 when "entities_extracted"23 entities = enriched_data["entities"] || []24 entities.each do |entity|25 Entity.create!(26 name: entity["name"],27 entity_type: entity["type"],28 source_id: customer_data["document_id"]29 )30 end31 enriched_data["entities_saved"] = entities.length32 end33 34 # 回调 ModelRiver35 api_key = Rails.application.credentials.dig(:modelriver, :api_key)36 37 conn = Faraday.new do |f|38 f.request :json39 f.response :raise_error40 f.options.timeout = 1041 end42 43 conn.post(callback_url) do |req|44 req.headers["Authorization"] = "Bearer #{api_key}"45 req.headers["Content-Type"] = "application/json"46 req.body = {47 data: enriched_data,48 task_id: "rails_#{event}_#{Time.current.to_i}",49 metadata: {50 processed_by: "rails",51 processed_at: Time.current.iso860152 }53 }.to_json54 end55 56 Rails.logger.info("✅ 已针对事件发送回调: #{event}")57 58 rescue Faraday::Error => e59 Rails.logger.error("❌ 回调失败: #{e.message}")60 61 # 发送错误回调62 Faraday.post(callback_url) do |req|63 req.headers["Authorization"] = "Bearer #{api_key}"64 req.headers["Content-Type"] = "application/json"65 req.body = {66 error: "processing_failed",67 message: e.message68 }.to_json69 end70 71 raise72 end73end触发异步请求
RUBY
1# app/services/modelriver_service.rb2class ModelriverService3 BASE_URL = "https://api.modelriver.com"4 5 def initialize6 @api_key = Rails.application.credentials.dig(:modelriver, :api_key)7 @conn = Faraday.new(url: BASE_URL) do |f|8 f.request :json9 f.response :json10 f.response :raise_error11 f.options.timeout = 1012 end13 end14 15 def trigger_async(workflow:, prompt:, metadata: {})16 response = @conn.post("/v1/ai/async") do |req|17 req.headers["Authorization"] = "Bearer #{@api_key}"18 req.body = {19 workflow: workflow,20 messages: [{ role: "user", content: prompt }],21 metadata: metadata22 }23 end24 25 response.body26 end27end28 29# 在控制器中使用30class AiController < ApplicationController31 def generate32 service = ModelriverService.new33 result = service.trigger_async(34 workflow: "content_generator",35 prompt: params[:prompt],36 metadata: { user_id: current_user.id }37 )38 39 render json: {40 channel_id: result["channel_id"],41 ws_token: result["ws_token"],42 websocket_channel: result["websocket_channel"]43 }44 end45end最佳实践
- 使用 Active Job:绝不要阻塞 Webhook 响应;将其分发到后台队列。
- 使用 Rails Credentials:安全地存储 API 密钥和 Webhook 密钥。
- 使用
secure_compare:防止在签名验证时遭受计时攻击。 - 重置请求 Body 流 (Rewind):为进行签名验证而在读取原始 Body 后,需调用
request.body.rewind。 - 使用 Faraday:统一的 HTTP 客户端,支持重试和超时。
下一步
- Phoenix 事件驱动指南:Elixir 备选方案
- Webhooks 参考:重试策略和投递监控
- 事件驱动 AI 概述:架构和流程