Phoenix 的事件驱动 AI

在 Phoenix 控制器中接收 AI Webhooks,利用 Oban 后台任务进行处理,并将结果实时推送到 LiveView 客户端。

概述

Phoenix 的实时处理能力(如 Channels、LiveView 和 PubSub)使其非常适合事件驱动的 AI 工作流。您可以在控制器中接收 Webhooks,使用 Oban 进行处理,并立即将最终结果推送到已连接的 LiveView 客户端。

您将构建的内容:

  • 一个用于接收 Webhooks 的 Phoenix 控制器端点
  • HMAC 签名验证 Plug
  • 用于后台处理的 Oban Worker
  • 用于实时展示结果的 LiveView 集成

快速开始

创建项目

Bash
mix phx.new my_ai_app
cd my_ai_app

配置

ELIXIR
1# config/runtime.exs
2config :my_ai_app,
3 modelriver_api_key: System.get_env("MODELRIVER_API_KEY"),
4 modelriver_webhook_secret: System.get_env("MODELRIVER_WEBHOOK_SECRET")

依赖项

ELIXIR
1# mix.exs
2defp deps do
3 [
4 {:oban, "~> 2.17"},
5 {:req, "~> 0.5"},
6 # ... 其他依赖
7 ]
8end

签名验证 Plug

ELIXIR
1# lib/my_ai_app_web/plugs/verify_modelriver_signature.ex
2defmodule MyAiAppWeb.Plugs.VerifyModelRiverSignature do
3 import Plug.Conn
4 require Logger
5 
6 def init(opts), do: opts
7 
8 def call(conn, _opts) do
9 signature = get_req_header(conn, "mr-signature") |> List.first("")
10 secret = Application.get_env(:my_ai_app, :modelriver_webhook_secret)
11 
12 {:ok, raw_body, conn} = Plug.Conn.read_body(conn)
13 
14 expected =
15 :crypto.mac(:hmac, :sha256, secret, raw_body)
16 |> Base.encode16(case: :lower)
17 
18 if Plug.Crypto.secure_compare(expected, signature) do
19 conn
20 |> assign(:raw_body, raw_body)
21 |> assign(:parsed_body, Jason.decode!(raw_body))
22 else
23 conn
24 |> put_status(:unauthorized)
25 |> Phoenix.Controller.json(%{error: "Invalid signature"})
26 |> halt()
27 end
28 end
29end

Webhook 控制器

ELIXIR
1# lib/my_ai_app_web/controllers/webhook_controller.ex
2defmodule MyAiAppWeb.WebhookController do
3 use MyAiAppWeb, :controller
4 require Logger
5 
6 plug MyAiAppWeb.Plugs.VerifyModelRiverSignature
7 
8 def modelriver(conn, _params) do
9 payload = conn.assigns.parsed_body
10 
11 case {payload["type"], payload["callback_url"]} do
12 {"task.ai_generated", callback_url} when is_binary(callback_url) ->
13 # 将后台任务加入队列
14 %{
15 "event" => payload["event"],
16 "ai_response" => payload["ai_response"],
17 "callback_url" => callback_url,
18 "customer_data" => payload["customer_data"] || %{}
19 }
20 |> MyAiApp.Workers.ProcessAiWebhook.new()
21 |> Oban.insert()
22 
23 json(conn, %{received: true})
24 
25 _ ->
26 Logger.info("收到标准 Webhook: #{payload["type"]}")
27 json(conn, %{received: true})
28 end
29 end
30end

路由

ELIXIR
1# lib/my_ai_app_web/router.ex
2scope "/webhooks", MyAiAppWeb do
3 pipe_through :api
4 post "/modelriver", WebhookController, :modelriver
5end

Oban Worker

ELIXIR
1# lib/my_ai_app/workers/process_ai_webhook.ex
2defmodule MyAiApp.Workers.ProcessAiWebhook do
3 use Oban.Worker, queue: :webhooks, max_attempts: 3
4 require Logger
5 
6 @impl Oban.Worker
7 def perform(%Oban.Job{args: args}) do
8 event = args["event"]
9 ai_response = args["ai_response"]
10 callback_url = args["callback_url"]
11 customer_data = args["customer_data"]
12 api_key = Application.get_env(:my_ai_app, :modelriver_api_key)
13 
14 enriched_data = ai_response["data"] || %{}
15 
16 # 您的自定义业务逻辑
17 enriched_data =
18 case event do
19 "content_ready" ->
20 {:ok, content} =
21 MyAiApp.Content.create(%{
22 title: enriched_data["title"],
23 body: enriched_data["description"],
24 category: customer_data["category"] || "general"
25 })
26 
27 Map.merge(enriched_data, %{
28 "id" => content.id,
29 "slug" => content.slug,
30 "saved_at" => DateTime.utc_now() |> DateTime.to_iso8601()
31 })
32 
33 _ ->
34 enriched_data
35 end
36 
37 # 回调 ModelRiver
38 case Req.post(callback_url,
39 json: %{
40 data: enriched_data,
41 task_id: "phoenix_#{event}_#{System.system_time(:second)}",
42 metadata: %{
43 processed_by: "phoenix",
44 processed_at: DateTime.utc_now() |> DateTime.to_iso8601()
45 }
46 },
47 headers: [
48 {"authorization", "Bearer #{api_key}"},
49 {"content-type", "application/json"}
50 ],
51 receive_timeout: 10_000
52 ) do
53 {:ok, %{status: status}} when status in 200..299 ->
54 Logger.info("✅ 已针对事件发送回调: #{event}")
55 :ok
56 
57 {:ok, %{status: status, body: body}} ->
58 Logger.error("❌ 回调失败 (#{status}): #{inspect(body)}")
59 {:error, "回调返回了 #{status}"}
60 
61 {:error, error} ->
62 Logger.error("❌ 回调错误: #{inspect(error)}")
63 {:error, error}
64 end
65 end
66end

LiveView 集成

使用 PubSub 将最终结果推送到已连接的 LiveView 客户端:

ELIXIR
1# 在您的 Oban worker 中,回调成功后:
2Phoenix.PubSub.broadcast(
3 MyAiApp.PubSub,
4 "ai_results:#{customer_data["user_id"]}",
5 {:ai_result, enriched_data}
6)
7 
8# 在您的 LiveView 中:
9defmodule MyAiAppWeb.ContentLive do
10 use MyAiAppWeb, :live_view
11 
12 def mount(_params, session, socket) do
13 if connected?(socket) do
14 user_id = session["user_id"]
15 Phoenix.PubSub.subscribe(MyAiApp.PubSub, "ai_results:#{user_id}")
16 end
17 
18 {:ok, assign(socket, result: nil, status: "idle")}
19 end
20 
21 def handle_info({:ai_result, data}, socket) do
22 {:noreply, assign(socket, result: data, status: "complete")}
23 end
24 
25 def render(assigns) do
26 ~H"""
27 <div>
28 <p :if={@status == "processing"}> AI ...</p>
29 <pre :if={@result}><%= Jason.encode!(@result, pretty: true) %></pre>
30 </div>
31 """
32 end
33end

最佳实践

  1. 使用 Oban:支持重试、遥测监控和死信队列的后台任务。
  2. 使用 PubSub:无需轮询即可将结果推送给 LiveView 客户端。
  3. 使用 Plug.Crypto.secure_compare:恒定时间签名验证。
  4. 使用 Req:现代化的可组合 HTTP 客户端,内置重试支持。
  5. 只读取 Body 一次:在签名验证后将原始 Body 存储在 conn.assigns 中。

下一步