Documentation

Event-driven AI with Django

Receive AI webhooks in Django views, process with Celery background tasks, and call back to ModelRiver: production-ready webhook handling.

Overview

Django's batteries-included approach makes it straightforward to handle ModelRiver event-driven AI webhooks. Use Django views for the webhook endpoint, Celery for background processing, and Django REST Framework for structured request handling.

What you'll build:

  • A webhook endpoint that receives AI-generated data from ModelRiver
  • HMAC signature verification decorator
  • Celery tasks for async processing and callback
  • Django ORM integration for data persistence

Quick start

Install dependencies

Bash
pip install django djangorestframework celery requests python-dotenv

Configuration

PYTHON
1# settings.py
2import os
3 
4MODELRIVER_API_KEY = os.environ.get("MODELRIVER_API_KEY", "")
5MODELRIVER_WEBHOOK_SECRET = os.environ.get("MODELRIVER_WEBHOOK_SECRET", "")

Webhook handler

View

PYTHON
1# webhooks/views.py
2import hmac
3import hashlib
4import json
5import os
6import requests
7from django.http import JsonResponse
8from django.views.decorators.csrf import csrf_exempt
9from django.views.decorators.http import require_POST
10from django.conf import settings
11 
12 
13def verify_webhook_signature(payload_bytes, signature, secret):
14 """Verify the mr-signature header using HMAC-SHA256."""
15 expected = hmac.new(
16 secret.encode("utf-8"),
17 payload_bytes,
18 hashlib.sha256,
19 ).hexdigest()
20 return hmac.compare_digest(signature, expected)
21 
22 
23@csrf_exempt
24@require_POST
25def modelriver_webhook(request):
26 signature = request.headers.get("mr-signature", "")
27 raw_body = request.body
28 
29 # 1. Verify webhook signature
30 if not verify_webhook_signature(raw_body, signature, settings.MODELRIVER_WEBHOOK_SECRET):
31 return JsonResponse({"error": "Invalid signature"}, status=401)
32 
33 payload = json.loads(raw_body)
34 event_type = payload.get("type")
35 callback_url = payload.get("callback_url")
36 
37 # 2. Handle event-driven workflow
38 if event_type == "task.ai_generated" and callback_url:
39 from .tasks import process_ai_webhook
40 
41 # Queue background task with Celery
42 process_ai_webhook.delay(
43 event=payload.get("event"),
44 ai_response=payload.get("ai_response"),
45 callback_url=callback_url,
46 customer_data=payload.get("customer_data", {}),
47 )
48 return JsonResponse({"received": True}, status=200)
49 
50 # Standard webhook
51 return JsonResponse({"received": True}, status=200)

URL configuration

PYTHON
1# webhooks/urls.py
2from django.urls import path
3from . import views
4 
5urlpatterns = [
6 path("webhooks/modelriver/", views.modelriver_webhook, name="modelriver_webhook"),
7]

Celery background task

PYTHON
1# webhooks/tasks.py
2import requests
3from celery import shared_task
4from django.conf import settings
5from datetime import datetime
6 
7 
8@shared_task(bind=True, max_retries=3, default_retry_delay=10)
9def process_ai_webhook(self, event, ai_response, callback_url, customer_data):
10 """Process AI response and call back to ModelRiver."""
11 try:
12 enriched_data = {**ai_response.get("data", {})}
13 
14 # 3. Your custom business logic based on the event
15 if event == "content_ready":
16 from myapp.models import Content
17 
18 content = Content.objects.create(
19 title=enriched_data.get("title", ""),
20 body=enriched_data.get("description", ""),
21 category=customer_data.get("category", "general"),
22 source="modelriver",
23 )
24 enriched_data["id"] = str(content.id)
25 enriched_data["slug"] = content.slug
26 enriched_data["saved_at"] = datetime.now().isoformat()
27 
28 if event == "entities_extracted":
29 from myapp.models import Entity
30 
31 for entity in enriched_data.get("entities", []):
32 Entity.objects.create(
33 name=entity["name"],
34 entity_type=entity["type"],
35 source_id=customer_data.get("document_id"),
36 )
37 enriched_data["entities_saved"] = len(enriched_data.get("entities", []))
38 
39 # 4. Call back to ModelRiver
40 response = requests.post(
41 callback_url,
42 json={
43 "data": enriched_data,
44 "task_id": f"django_{event}_{datetime.now().strftime('%Y%m%d%H%M%S')}",
45 "metadata": {
46 "processed_by": "django",
47 "processed_at": datetime.now().isoformat(),
48 },
49 },
50 headers={
51 "Authorization": f"Bearer {settings.MODELRIVER_API_KEY}",
52 "Content-Type": "application/json",
53 },
54 timeout=10,
55 )
56 response.raise_for_status()
57 print(f"✅ Callback sent for event: {event}")
58 
59 except requests.exceptions.RequestException as exc:
60 print(f"❌ Callback failed: {exc}")
61 # Send error callback
62 try:
63 requests.post(
64 callback_url,
65 json={
66 "error": "processing_failed",
67 "message": str(exc),
68 },
69 headers={
70 "Authorization": f"Bearer {settings.MODELRIVER_API_KEY}",
71 "Content-Type": "application/json",
72 },
73 timeout=10,
74 )
75 except Exception:
76 pass
77 raise self.retry(exc=exc)

Django REST Framework

For a more structured approach using DRF serializers:

PYTHON
1# webhooks/serializers.py
2from rest_framework import serializers
3 
4 
5class AIResponseSerializer(serializers.Serializer):
6 data = serializers.DictField()
7 
8 
9class WebhookPayloadSerializer(serializers.Serializer):
10 type = serializers.CharField()
11 event = serializers.CharField(required=False)
12 channel_id = serializers.CharField()
13 ai_response = AIResponseSerializer(required=False)
14 callback_url = serializers.URLField(required=False)
15 callback_required = serializers.BooleanField(required=False)
16 customer_data = serializers.DictField(required=False, default={})
17 timestamp = serializers.DateTimeField(required=False)
18 
19 
20# webhooks/views.py
21from rest_framework.views import APIView
22from rest_framework.response import Response
23from rest_framework import status
24 
25 
26class ModelRiverWebhookView(APIView):
27 authentication_classes = [] # Webhook uses signature verification instead
28 permission_classes = []
29 
30 def post(self, request):
31 signature = request.headers.get("mr-signature", "")
32 raw_body = request.body
33 
34 if not verify_webhook_signature(raw_body, signature, settings.MODELRIVER_WEBHOOK_SECRET):
35 return Response({"error": "Invalid signature"}, status=status.HTTP_401_UNAUTHORIZED)
36 
37 serializer = WebhookPayloadSerializer(data=request.data)
38 serializer.is_valid(raise_exception=True)
39 
40 data = serializer.validated_data
41 if data["type"] == "task.ai_generated" and data.get("callback_url"):
42 process_ai_webhook.delay(
43 event=data.get("event"),
44 ai_response=data.get("ai_response"),
45 callback_url=data["callback_url"],
46 customer_data=data.get("customer_data", {}),
47 )
48 
49 return Response({"received": True}, status=status.HTTP_200_OK)

Triggering async requests

PYTHON
1# ai/client.py
2import requests
3from django.conf import settings
4 
5 
6def trigger_async_ai(workflow: str, prompt: str, metadata: dict = None) -> dict:
7 """Trigger an event-driven AI request."""
8 response = requests.post(
9 "https://api.modelriver.com/v1/ai/async",
10 headers={
11 "Authorization": f"Bearer {settings.MODELRIVER_API_KEY}",
12 "Content-Type": "application/json",
13 },
14 json={
15 "workflow": workflow,
16 "messages": [{"role": "user", "content": prompt}],
17 "metadata": metadata or {},
18 },
19 timeout=10,
20 )
21 response.raise_for_status()
22 return response.json()

Best practices

  1. Use Celery for processing: Never block the webhook response with heavy logic.
  2. Set task retries: Use max_retries and default_retry_delay on Celery tasks.
  3. Verify signatures first: Always check mr-signature before processing.
  4. Use Django ORM transactions: Wrap database writes in transaction.atomic() for consistency.
  5. Monitor with Request Logs: Track event-driven flows in Observability.

Next steps