Documentation

Event-driven AI with Phoenix

Receive AI webhooks in Phoenix controllers, process with Oban background jobs, and push results to LiveView clients in real time.

Overview

Phoenix's real-time capabilities: channels, LiveView, and PubSub: make it a natural fit for event-driven AI workflows. Receive webhooks in a controller, process with Oban, and push the final result to connected LiveView clients instantly.

What you'll build:

  • A Phoenix controller endpoint for receiving webhooks
  • HMAC signature verification plug
  • Oban worker for background processing
  • LiveView integration for real-time results

Quick start

Create project

Bash
mix phx.new my_ai_app
cd my_ai_app

Configuration

ELIXIR
1# config/runtime.exs
2config :my_ai_app,
3 modelriver_api_key: System.get_env("MODELRIVER_API_KEY"),
4 modelriver_webhook_secret: System.get_env("MODELRIVER_WEBHOOK_SECRET")

Dependencies

ELIXIR
1# mix.exs
2defp deps do
3 [
4 {:oban, "~> 2.17"},
5 {:req, "~> 0.5"},
6 # ... other deps
7 ]
8end

Signature verification plug

ELIXIR
1# lib/my_ai_app_web/plugs/verify_modelriver_signature.ex
2defmodule MyAiAppWeb.Plugs.VerifyModelRiverSignature do
3 import Plug.Conn
4 require Logger
5 
6 def init(opts), do: opts
7 
8 def call(conn, _opts) do
9 signature = get_req_header(conn, "mr-signature") |> List.first("")
10 secret = Application.get_env(:my_ai_app, :modelriver_webhook_secret)
11 
12 {:ok, raw_body, conn} = Plug.Conn.read_body(conn)
13 
14 expected =
15 :crypto.mac(:hmac, :sha256, secret, raw_body)
16 |> Base.encode16(case: :lower)
17 
18 if Plug.Crypto.secure_compare(expected, signature) do
19 conn
20 |> assign(:raw_body, raw_body)
21 |> assign(:parsed_body, Jason.decode!(raw_body))
22 else
23 conn
24 |> put_status(:unauthorized)
25 |> Phoenix.Controller.json(%{error: "Invalid signature"})
26 |> halt()
27 end
28 end
29end

Webhook controller

ELIXIR
1# lib/my_ai_app_web/controllers/webhook_controller.ex
2defmodule MyAiAppWeb.WebhookController do
3 use MyAiAppWeb, :controller
4 require Logger
5 
6 plug MyAiAppWeb.Plugs.VerifyModelRiverSignature
7 
8 def modelriver(conn, _params) do
9 payload = conn.assigns.parsed_body
10 
11 case {payload["type"], payload["callback_url"]} do
12 {"task.ai_generated", callback_url} when is_binary(callback_url) ->
13 # Queue background job
14 %{
15 "event" => payload["event"],
16 "ai_response" => payload["ai_response"],
17 "callback_url" => callback_url,
18 "customer_data" => payload["customer_data"] || %{}
19 }
20 |> MyAiApp.Workers.ProcessAiWebhook.new()
21 |> Oban.insert()
22 
23 json(conn, %{received: true})
24 
25 _ ->
26 Logger.info("Standard webhook received: #{payload["type"]}")
27 json(conn, %{received: true})
28 end
29 end
30end

Router

ELIXIR
1# lib/my_ai_app_web/router.ex
2scope "/webhooks", MyAiAppWeb do
3 pipe_through :api
4 post "/modelriver", WebhookController, :modelriver
5end

Oban worker

ELIXIR
1# lib/my_ai_app/workers/process_ai_webhook.ex
2defmodule MyAiApp.Workers.ProcessAiWebhook do
3 use Oban.Worker, queue: :webhooks, max_attempts: 3
4 require Logger
5 
6 @impl Oban.Worker
7 def perform(%Oban.Job{args: args}) do
8 event = args["event"]
9 ai_response = args["ai_response"]
10 callback_url = args["callback_url"]
11 customer_data = args["customer_data"]
12 api_key = Application.get_env(:my_ai_app, :modelriver_api_key)
13 
14 enriched_data = ai_response["data"] || %{}
15 
16 # Your custom business logic
17 enriched_data =
18 case event do
19 "content_ready" ->
20 {:ok, content} =
21 MyAiApp.Content.create(%{
22 title: enriched_data["title"],
23 body: enriched_data["description"],
24 category: customer_data["category"] || "general"
25 })
26 
27 Map.merge(enriched_data, %{
28 "id" => content.id,
29 "slug" => content.slug,
30 "saved_at" => DateTime.utc_now() |> DateTime.to_iso8601()
31 })
32 
33 _ ->
34 enriched_data
35 end
36 
37 # Call back to ModelRiver
38 case Req.post(callback_url,
39 json: %{
40 data: enriched_data,
41 task_id: "phoenix_#{event}_#{System.system_time(:second)}",
42 metadata: %{
43 processed_by: "phoenix",
44 processed_at: DateTime.utc_now() |> DateTime.to_iso8601()
45 }
46 },
47 headers: [
48 {"authorization", "Bearer #{api_key}"},
49 {"content-type", "application/json"}
50 ],
51 receive_timeout: 10_000
52 ) do
53 {:ok, %{status: status}} when status in 200..299 ->
54 Logger.info("✅ Callback sent for event: #{event}")
55 :ok
56 
57 {:ok, %{status: status, body: body}} ->
58 Logger.error("❌ Callback failed (#{status}): #{inspect(body)}")
59 {:error, "Callback returned #{status}"}
60 
61 {:error, error} ->
62 Logger.error("❌ Callback error: #{inspect(error)}")
63 {:error, error}
64 end
65 end
66end

LiveView integration

Push the final result to connected LiveView clients using PubSub:

ELIXIR
1# In your Oban worker, after successful callback:
2Phoenix.PubSub.broadcast(
3 MyAiApp.PubSub,
4 "ai_results:#{customer_data["user_id"]}",
5 {:ai_result, enriched_data}
6)
7 
8# In your LiveView:
9defmodule MyAiAppWeb.ContentLive do
10 use MyAiAppWeb, :live_view
11 
12 def mount(_params, session, socket) do
13 if connected?(socket) do
14 user_id = session["user_id"]
15 Phoenix.PubSub.subscribe(MyAiApp.PubSub, "ai_results:#{user_id}")
16 end
17 
18 {:ok, assign(socket, result: nil, status: "idle")}
19 end
20 
21 def handle_info({:ai_result, data}, socket) do
22 {:noreply, assign(socket, result: data, status: "complete")}
23 end
24 
25 def render(assigns) do
26 ~H"""
27 <div>
28 <p :if={@status == "processing"}>Processing AI request...</p>
29 <pre :if={@result}><%= Jason.encode!(@result, pretty: true) %></pre>
30 </div>
31 """
32 end
33end

Best practices

  1. Use Oban: Background jobs with retries, telemetry, and dead-letter queues.
  2. Use PubSub: Push results to LiveView clients without polling.
  3. Use Plug.Crypto.secure_compare: Constant-time signature verification.
  4. Use Req: Modern, composable HTTP client with built-in retry support.
  5. Read body once: Store the raw body in conn.assigns after signature verification.

Next steps