Django 的事件驱动 AI

在 Django views 中接收 AI Webhooks,并通过 Celery 后台任务进行处理,然后回调 ModelRiver:生产级的 Webhook 处理方案。

概述

Django 的“全功能 (Batteries-included)”特性使其处理 ModelRiver 事件驱动 AI Webhooks 变得非常直接。您可以使用 Django views 作为 Webhook 端点,使用 Celery 进行后台处理,并使用 Django REST Framework 进行结构化请求处理。

您将构建的内容:

  • 一个用于接收 ModelRiver 发出的 AI 生成数据的 Webhook 端点
  • HMAC 签名验证修饰器 (Decorator)
  • 用于异步处理和回调的 Celery 任务
  • 用于数据持久化的 Django ORM 集成

快速开始

安装依赖

Bash
pip install django djangorestframework celery requests python-dotenv

配置

PYTHON
1# settings.py
2import os
3 
4MODELRIVER_API_KEY = os.environ.get("MODELRIVER_API_KEY", "")
5MODELRIVER_WEBHOOK_SECRET = os.environ.get("MODELRIVER_WEBHOOK_SECRET", "")

Webhook 处理程序

View

PYTHON
1# webhooks/views.py
2import hmac
3import hashlib
4import json
5import os
6import requests
7from django.http import JsonResponse
8from django.views.decorators.csrf import csrf_exempt
9from django.views.decorators.http import require_POST
10from django.conf import settings
11 
12 
13def verify_webhook_signature(payload_bytes, signature, secret):
14 """使用 HMAC-SHA256 验证 mr-signature 请求头。"""
15 expected = hmac.new(
16 secret.encode("utf-8"),
17 payload_bytes,
18 hashlib.sha256,
19 ).hexdigest()
20 return hmac.compare_digest(signature, expected)
21 
22 
23@csrf_exempt
24@require_POST
25def modelriver_webhook(request):
26 signature = request.headers.get("mr-signature", "")
27 raw_body = request.body
28 
29 // 1. 验证 Webhook 签名
30 if not verify_webhook_signature(raw_body, signature, settings.MODELRIVER_WEBHOOK_SECRET):
31 return JsonResponse({"error": "Invalid signature"}, status=401)
32 
33 payload = json.loads(raw_body)
34 event_type = payload.get("type")
35 callback_url = payload.get("callback_url")
36 
37 // 2. 处理事件驱动的工作流
38 if event_type == "task.ai_generated" and callback_url:
39 from .tasks import process_ai_webhook
40 
41 # 使用 Celery 将任务加入后台队列
42 process_ai_webhook.delay(
43 event=payload.get("event"),
44 ai_response=payload.get("ai_response"),
45 callback_url=callback_url,
46 customer_data=payload.get("customer_data", {}),
47 )
48 return JsonResponse({"received": True}, status=200)
49 
50 # 标准 Webhook
51 return JsonResponse({"received": True}, status=200)

URL 配置

PYTHON
1# webhooks/urls.py
2from django.urls import path
3from . import views
4 
5urlpatterns = [
6 path("webhooks/modelriver/", views.modelriver_webhook, name="modelriver_webhook"),
7]

Celery 后台任务

PYTHON
1# webhooks/tasks.py
2import requests
3from celery import shared_task
4from django.conf import settings
5from datetime import datetime
6 
7 
8@shared_task(bind=True, max_retries=3, default_retry_delay=10)
9def process_ai_webhook(self, event, ai_response, callback_url, customer_data):
10 """处理 AI 响应并回调 ModelRiver。"""
11 try:
12 enriched_data = {**ai_response.get("data", {})}
13 
14 # 3. 基于事件的自定义业务逻辑
15 if event == "content_ready":
16 from myapp.models import Content
17 
18 content = Content.objects.create(
19 title=enriched_data.get("title", ""),
20 body=enriched_data.get("description", ""),
21 category=customer_data.get("category", "general"),
22 source="modelriver",
23 )
24 enriched_data["id"] = str(content.id)
25 enriched_data["slug"] = content.slug
26 enriched_data["saved_at"] = datetime.now().isoformat()
27 
28 if event == "entities_extracted":
29 from myapp.models import Entity
30 
31 for entity in enriched_data.get("entities", []):
32 Entity.objects.create(
33 name=entity["name"],
34 entity_type=entity["type"],
35 source_id=customer_data.get("document_id"),
36 )
37 enriched_data["entities_saved"] = len(enriched_data.get("entities", []))
38 
39 # 4. 回调 ModelRiver
40 response = requests.post(
41 callback_url,
42 json={
43 "data": enriched_data,
44 "task_id": f"django_{event}_{datetime.now().strftime('%Y%m%d%H%M%S')}",
45 "metadata": {
46 "processed_by": "django",
47 "processed_at": datetime.now().isoformat(),
48 },
49 },
50 headers={
51 "Authorization": f"Bearer {settings.MODELRIVER_API_KEY}",
52 "Content-Type": "application/json",
53 },
54 timeout=10,
55 )
56 response.raise_for_status()
57 print(f"✅ 已针对事件发送回调: {event}")
58 
59 except requests.exceptions.RequestException as exc:
60 print(f"❌ 回调失败: {exc}")
61 # 发送错误回调
62 try:
63 requests.post(
64 callback_url,
65 json={
66 "error": "processing_failed",
67 "message": str(exc),
68 },
69 headers={
70 "Authorization": f"Bearer {settings.MODELRIVER_API_KEY}",
71 "Content-Type": "application/json",
72 },
73 timeout=10,
74 )
75 except Exception:
76 pass
77 raise self.retry(exc=exc)

Django REST Framework

使用 DRF Serializers 的更结构化方法:

PYTHON
1# webhooks/serializers.py
2from rest_framework import serializers
3 
4 
5class AIResponseSerializer(serializers.Serializer):
6 data = serializers.DictField()
7 
8 
9class WebhookPayloadSerializer(serializers.Serializer):
10 type = serializers.CharField()
11 event = serializers.CharField(required=False)
12 channel_id = serializers.CharField()
13 ai_response = AIResponseSerializer(required=False)
14 callback_url = serializers.URLField(required=False)
15 callback_required = serializers.BooleanField(required=False)
16 customer_data = serializers.DictField(required=False, default={})
17 timestamp = serializers.DateTimeField(required=False)
18 
19 
20# webhooks/views.py
21from rest_framework.views import APIView
22from rest_framework.response import Response
23from rest_framework import status
24 
25 
26class ModelRiverWebhookView(APIView):
27 authentication_classes = [] # Webhook 使用签名验证
28 permission_classes = []
29 
30 def post(self, request):
31 signature = request.headers.get("mr-signature", "")
32 raw_body = request.body
33 
34 if not verify_webhook_signature(raw_body, signature, settings.MODELRIVER_WEBHOOK_SECRET):
35 return Response({"error": "Invalid signature"}, status=status.HTTP_401_UNAUTHORIZED)
36 
37 serializer = WebhookPayloadSerializer(data=request.data)
38 serializer.is_valid(raise_exception=True)
39 
40 data = serializer.validated_data
41 if data["type"] == "task.ai_generated" and data.get("callback_url"):
42 process_ai_webhook.delay(
43 event=data.get("event"),
44 ai_response=data.get("ai_response"),
45 callback_url=data["callback_url"],
46 customer_data=data.get("customer_data", {}),
47 )
48 
49 return Response({"received": True}, status=status.HTTP_200_OK)

触发异步请求

PYTHON
1# ai/client.py
2import requests
3from django.conf import settings
4 
5 
6def trigger_async_ai(workflow: str, prompt: str, metadata: dict = None) -> dict:
7 """触发事件驱动的 AI 请求。"""
8 response = requests.post(
9 "https://api.modelriver.com/v1/ai/async",
10 headers={
11 "Authorization": f"Bearer {settings.MODELRIVER_API_KEY}",
12 "Content-Type": "application/json",
13 },
14 json={
15 "workflow": workflow,
16 "messages": [{"role": "user", "content": prompt}],
17 "metadata": metadata or {},
18 },
19 timeout=10,
20 )
21 response.raise_for_status()
22 return response.json()

最佳实践

  1. 为处理过程使用 Celery:绝不要让繁重的逻辑阻塞 Webhook 响应。
  2. 设置任务重试:在 Celery 任务上使用 max_retriesdefault_retry_delay
  3. 首先验证签名:在处理之前,务必先校验 mr-signature
  4. 使用 Django ORM 事务:为了保证一致性,请将数据库写入操作封装在 transaction.atomic() 中。
  5. 使用请求日志进行监控:在 可观测性 (Observability) 中追踪事件驱动流程。

下一步