概述
Phoenix 的实时处理能力(如 Channels、LiveView 和 PubSub)使其非常适合事件驱动的 AI 工作流。您可以在控制器中接收 Webhooks,使用 Oban 进行处理,并立即将最终结果推送到已连接的 LiveView 客户端。
您将构建的内容:
- 一个用于接收 Webhooks 的 Phoenix 控制器端点
- HMAC 签名验证 Plug
- 用于后台处理的 Oban Worker
- 用于实时展示结果的 LiveView 集成
快速开始
创建项目
Bash
mix phx.new my_ai_appcd my_ai_app配置
ELIXIR
1# config/runtime.exs2config :my_ai_app,3 modelriver_api_key: System.get_env("MODELRIVER_API_KEY"),4 modelriver_webhook_secret: System.get_env("MODELRIVER_WEBHOOK_SECRET")依赖项
ELIXIR
1# mix.exs2defp deps do3 [4 {:oban, "~> 2.17"},5 {:req, "~> 0.5"},6 # ... 其他依赖7 ]8end签名验证 Plug
ELIXIR
1# lib/my_ai_app_web/plugs/verify_modelriver_signature.ex2defmodule MyAiAppWeb.Plugs.VerifyModelRiverSignature do3 import Plug.Conn4 require Logger5 6 def init(opts), do: opts7 8 def call(conn, _opts) do9 signature = get_req_header(conn, "mr-signature") |> List.first("")10 secret = Application.get_env(:my_ai_app, :modelriver_webhook_secret)11 12 {:ok, raw_body, conn} = Plug.Conn.read_body(conn)13 14 expected =15 :crypto.mac(:hmac, :sha256, secret, raw_body)16 |> Base.encode16(case: :lower)17 18 if Plug.Crypto.secure_compare(expected, signature) do19 conn20 |> assign(:raw_body, raw_body)21 |> assign(:parsed_body, Jason.decode!(raw_body))22 else23 conn24 |> put_status(:unauthorized)25 |> Phoenix.Controller.json(%{error: "Invalid signature"})26 |> halt()27 end28 end29endWebhook 控制器
ELIXIR
1# lib/my_ai_app_web/controllers/webhook_controller.ex2defmodule MyAiAppWeb.WebhookController do3 use MyAiAppWeb, :controller4 require Logger5 6 plug MyAiAppWeb.Plugs.VerifyModelRiverSignature7 8 def modelriver(conn, _params) do9 payload = conn.assigns.parsed_body10 11 case {payload["type"], payload["callback_url"]} do12 {"task.ai_generated", callback_url} when is_binary(callback_url) ->13 # 将后台任务加入队列14 %{15 "event" => payload["event"],16 "ai_response" => payload["ai_response"],17 "callback_url" => callback_url,18 "customer_data" => payload["customer_data"] || %{}19 }20 |> MyAiApp.Workers.ProcessAiWebhook.new()21 |> Oban.insert()22 23 json(conn, %{received: true})24 25 _ ->26 Logger.info("收到标准 Webhook: #{payload["type"]}")27 json(conn, %{received: true})28 end29 end30end路由
ELIXIR
1# lib/my_ai_app_web/router.ex2scope "/webhooks", MyAiAppWeb do3 pipe_through :api4 post "/modelriver", WebhookController, :modelriver5endOban Worker
ELIXIR
1# lib/my_ai_app/workers/process_ai_webhook.ex2defmodule MyAiApp.Workers.ProcessAiWebhook do3 use Oban.Worker, queue: :webhooks, max_attempts: 34 require Logger5 6 @impl Oban.Worker7 def perform(%Oban.Job{args: args}) do8 event = args["event"]9 ai_response = args["ai_response"]10 callback_url = args["callback_url"]11 customer_data = args["customer_data"]12 api_key = Application.get_env(:my_ai_app, :modelriver_api_key)13 14 enriched_data = ai_response["data"] || %{}15 16 # 您的自定义业务逻辑17 enriched_data =18 case event do19 "content_ready" ->20 {:ok, content} =21 MyAiApp.Content.create(%{22 title: enriched_data["title"],23 body: enriched_data["description"],24 category: customer_data["category"] || "general"25 })26 27 Map.merge(enriched_data, %{28 "id" => content.id,29 "slug" => content.slug,30 "saved_at" => DateTime.utc_now() |> DateTime.to_iso8601()31 })32 33 _ ->34 enriched_data35 end36 37 # 回调 ModelRiver38 case Req.post(callback_url,39 json: %{40 data: enriched_data,41 task_id: "phoenix_#{event}_#{System.system_time(:second)}",42 metadata: %{43 processed_by: "phoenix",44 processed_at: DateTime.utc_now() |> DateTime.to_iso8601()45 }46 },47 headers: [48 {"authorization", "Bearer #{api_key}"},49 {"content-type", "application/json"}50 ],51 receive_timeout: 10_00052 ) do53 {:ok, %{status: status}} when status in 200..299 ->54 Logger.info("✅ 已针对事件发送回调: #{event}")55 :ok56 57 {:ok, %{status: status, body: body}} ->58 Logger.error("❌ 回调失败 (#{status}): #{inspect(body)}")59 {:error, "回调返回了 #{status}"}60 61 {:error, error} ->62 Logger.error("❌ 回调错误: #{inspect(error)}")63 {:error, error}64 end65 end66endLiveView 集成
使用 PubSub 将最终结果推送到已连接的 LiveView 客户端:
ELIXIR
1# 在您的 Oban worker 中,回调成功后:2Phoenix.PubSub.broadcast(3 MyAiApp.PubSub,4 "ai_results:#{customer_data["user_id"]}",5 {:ai_result, enriched_data}6)7 8# 在您的 LiveView 中:9defmodule MyAiAppWeb.ContentLive do10 use MyAiAppWeb, :live_view11 12 def mount(_params, session, socket) do13 if connected?(socket) do14 user_id = session["user_id"]15 Phoenix.PubSub.subscribe(MyAiApp.PubSub, "ai_results:#{user_id}")16 end17 18 {:ok, assign(socket, result: nil, status: "idle")}19 end20 21 def handle_info({:ai_result, data}, socket) do22 {:noreply, assign(socket, result: data, status: "complete")}23 end24 25 def render(assigns) do26 ~H"""27 <div>28 <p :if={@status == "processing"}>正在处理 AI 请求...</p>29 <pre :if={@result}><%= Jason.encode!(@result, pretty: true) %></pre>30 </div>31 """32 end33end最佳实践
- 使用 Oban:支持重试、遥测监控和死信队列的后台任务。
- 使用 PubSub:无需轮询即可将结果推送给 LiveView 客户端。
- 使用
Plug.Crypto.secure_compare:恒定时间签名验证。 - 使用 Req:现代化的可组合 HTTP 客户端,内置重试支持。
- 只读取 Body 一次:在签名验证后将原始 Body 存储在
conn.assigns中。
下一步
- Spring Boot 事件驱动指南:Java 备选方案
- Phoenix 集成:标准 ModelRiver + Phoenix 的用法
- Webhooks 参考:重试策略和投递监控