Overview
Phoenix's real-time capabilities: channels, LiveView, and PubSub: make it a natural fit for event-driven AI workflows. Receive webhooks in a controller, process with Oban, and push the final result to connected LiveView clients instantly.
What you'll build:
- A Phoenix controller endpoint for receiving webhooks
- HMAC signature verification plug
- Oban worker for background processing
- LiveView integration for real-time results
Quick start
Create project
Bash
mix phx.new my_ai_appcd my_ai_appConfiguration
ELIXIR
1# config/runtime.exs2config :my_ai_app,3 modelriver_api_key: System.get_env("MODELRIVER_API_KEY"),4 modelriver_webhook_secret: System.get_env("MODELRIVER_WEBHOOK_SECRET")Dependencies
ELIXIR
1# mix.exs2defp deps do3 [4 {:oban, "~> 2.17"},5 {:req, "~> 0.5"},6 # ... other deps7 ]8endSignature verification plug
ELIXIR
1# lib/my_ai_app_web/plugs/verify_modelriver_signature.ex2defmodule MyAiAppWeb.Plugs.VerifyModelRiverSignature do3 import Plug.Conn4 require Logger5 6 def init(opts), do: opts7 8 def call(conn, _opts) do9 signature = get_req_header(conn, "mr-signature") |> List.first("")10 secret = Application.get_env(:my_ai_app, :modelriver_webhook_secret)11 12 {:ok, raw_body, conn} = Plug.Conn.read_body(conn)13 14 expected =15 :crypto.mac(:hmac, :sha256, secret, raw_body)16 |> Base.encode16(case: :lower)17 18 if Plug.Crypto.secure_compare(expected, signature) do19 conn20 |> assign(:raw_body, raw_body)21 |> assign(:parsed_body, Jason.decode!(raw_body))22 else23 conn24 |> put_status(:unauthorized)25 |> Phoenix.Controller.json(%{error: "Invalid signature"})26 |> halt()27 end28 end29endWebhook controller
ELIXIR
1# lib/my_ai_app_web/controllers/webhook_controller.ex2defmodule MyAiAppWeb.WebhookController do3 use MyAiAppWeb, :controller4 require Logger5 6 plug MyAiAppWeb.Plugs.VerifyModelRiverSignature7 8 def modelriver(conn, _params) do9 payload = conn.assigns.parsed_body10 11 case {payload["type"], payload["callback_url"]} do12 {"task.ai_generated", callback_url} when is_binary(callback_url) ->13 # Queue background job14 %{15 "event" => payload["event"],16 "ai_response" => payload["ai_response"],17 "callback_url" => callback_url,18 "customer_data" => payload["customer_data"] || %{}19 }20 |> MyAiApp.Workers.ProcessAiWebhook.new()21 |> Oban.insert()22 23 json(conn, %{received: true})24 25 _ ->26 Logger.info("Standard webhook received: #{payload["type"]}")27 json(conn, %{received: true})28 end29 end30endRouter
ELIXIR
1# lib/my_ai_app_web/router.ex2scope "/webhooks", MyAiAppWeb do3 pipe_through :api4 post "/modelriver", WebhookController, :modelriver5endOban worker
ELIXIR
1# lib/my_ai_app/workers/process_ai_webhook.ex2defmodule MyAiApp.Workers.ProcessAiWebhook do3 use Oban.Worker, queue: :webhooks, max_attempts: 34 require Logger5 6 @impl Oban.Worker7 def perform(%Oban.Job{args: args}) do8 event = args["event"]9 ai_response = args["ai_response"]10 callback_url = args["callback_url"]11 customer_data = args["customer_data"]12 api_key = Application.get_env(:my_ai_app, :modelriver_api_key)13 14 enriched_data = ai_response["data"] || %{}15 16 # Your custom business logic17 enriched_data =18 case event do19 "content_ready" ->20 {:ok, content} =21 MyAiApp.Content.create(%{22 title: enriched_data["title"],23 body: enriched_data["description"],24 category: customer_data["category"] || "general"25 })26 27 Map.merge(enriched_data, %{28 "id" => content.id,29 "slug" => content.slug,30 "saved_at" => DateTime.utc_now() |> DateTime.to_iso8601()31 })32 33 _ ->34 enriched_data35 end36 37 # Call back to ModelRiver38 case Req.post(callback_url,39 json: %{40 data: enriched_data,41 task_id: "phoenix_#{event}_#{System.system_time(:second)}",42 metadata: %{43 processed_by: "phoenix",44 processed_at: DateTime.utc_now() |> DateTime.to_iso8601()45 }46 },47 headers: [48 {"authorization", "Bearer #{api_key}"},49 {"content-type", "application/json"}50 ],51 receive_timeout: 10_00052 ) do53 {:ok, %{status: status}} when status in 200..299 ->54 Logger.info("✅ Callback sent for event: #{event}")55 :ok56 57 {:ok, %{status: status, body: body}} ->58 Logger.error("❌ Callback failed (#{status}): #{inspect(body)}")59 {:error, "Callback returned #{status}"}60 61 {:error, error} ->62 Logger.error("❌ Callback error: #{inspect(error)}")63 {:error, error}64 end65 end66endLiveView integration
Push the final result to connected LiveView clients using PubSub:
ELIXIR
1# In your Oban worker, after successful callback:2Phoenix.PubSub.broadcast(3 MyAiApp.PubSub,4 "ai_results:#{customer_data["user_id"]}",5 {:ai_result, enriched_data}6)7 8# In your LiveView:9defmodule MyAiAppWeb.ContentLive do10 use MyAiAppWeb, :live_view11 12 def mount(_params, session, socket) do13 if connected?(socket) do14 user_id = session["user_id"]15 Phoenix.PubSub.subscribe(MyAiApp.PubSub, "ai_results:#{user_id}")16 end17 18 {:ok, assign(socket, result: nil, status: "idle")}19 end20 21 def handle_info({:ai_result, data}, socket) do22 {:noreply, assign(socket, result: data, status: "complete")}23 end24 25 def render(assigns) do26 ~H"""27 <div>28 <p :if={@status == "processing"}>Processing AI request...</p>29 <pre :if={@result}><%= Jason.encode!(@result, pretty: true) %></pre>30 </div>31 """32 end33endBest practices
- Use Oban: Background jobs with retries, telemetry, and dead-letter queues.
- Use PubSub: Push results to LiveView clients without polling.
- Use
Plug.Crypto.secure_compare: Constant-time signature verification. - Use Req: Modern, composable HTTP client with built-in retry support.
- Read body once: Store the raw body in
conn.assignsafter signature verification.
Next steps
- Spring Boot event-driven guide: Java alternative
- Phoenix integration: Standard ModelRiver + Phoenix usage
- Webhooks reference: Retry policies and delivery monitoring