Django + ModelRiver

为您的 Django 应用程序添加 AI 能力。REST API 视图、Celery 后台任务以及用于实时流式传输的 Django Channels。

概览

Django 是 Python 中以“内含电池 (batteries-included)”著称的 Web 框架。只需几行配置,您即可将 ModelRiver 的 AI 能力添加到您的 Django 视图、REST API 和后台任务中。


快速上手

安装相关依赖

Bash
pip install django openai djangorestframework python-dotenv

设定配置 (Configuration)

PYTHON
1# settings.py
2import os
3 
4MODELRIVER_API_KEY = os.environ.get("MODELRIVER_API_KEY", "")
5MODELRIVER_BASE_URL = "https://api.modelriver.com/v1"

AI 客户端服务 (AI client service)

PYTHON
1# ai/client.py
2from openai import OpenAI
3from django.conf import settings
4 
5def get_client():
6 return OpenAI(
7 base_url=settings.MODELRIVER_BASE_URL,
8 api_key=settings.MODELRIVER_API_KEY,
9 )
10 
11def chat(workflow: str, messages: list, **kwargs) -> str:
12 client = get_client()
13 response = client.chat.completions.create(
14 model=workflow,
15 messages=messages,
16 **kwargs,
17 )
18 return response.choices[0].message.content

Django 视图 (Django views)

PYTHON
1# views.py
2from django.http import JsonResponse
3from django.views.decorators.csrf import csrf_exempt
4from django.views.decorators.http import require_POST
5import json
6from .client import chat
7 
8@csrf_exempt
9@require_POST
10def chat_view(request):
11 body = json.loads(request.body)
12 message = body.get("message", "")
13 
14 response = chat(
15 workflow="my-chat-workflow",
16 messages=[{"role": "user", "content": message}],
17 )
18 
19 return JsonResponse({"content": response})

Django REST Framework

PYTHON
1# serializers.py
2from rest_framework import serializers
3 
4class ChatSerializer(serializers.Serializer):
5 message = serializers.CharField(max_length=4000)
6 workflow = serializers.CharField(default="my-chat-workflow")
7 
8class ChatResponseSerializer(serializers.Serializer):
9 content = serializers.CharField()
10 tokens = serializers.IntegerField()
11 
12# views.py
13from rest_framework.views import APIView
14from rest_framework.response import Response
15from openai import OpenAI
16from django.conf import settings
17 
18class ChatAPIView(APIView):
19 def post(self, request):
20 serializer = ChatSerializer(data=request.data)
21 serializer.is_valid(raise_exception=True)
22 
23 client = OpenAI(
24 base_url=settings.MODELRIVER_BASE_URL,
25 api_key=settings.MODELRIVER_API_KEY,
26 )
27 
28 response = client.chat.completions.create(
29 model=serializer.validated_data["workflow"],
30 messages=[{"role": "user", "content": serializer.validated_data["message"]}],
31 )
32 
33 return Response({
34 "content": response.choices[0].message.content,
35 "tokens": response.usage.total_tokens,
36 })

Celery 后台后台任务 (Celery background tasks)

遇到吃重的 AI 处理负荷时,请使用 Celery 来避免拥堵或阻塞正常的 Web 工作进程:

PYTHON
1# tasks.py
2from celery import shared_task
3from openai import OpenAI
4from django.conf import settings
5 
6@shared_task
7def summarise_document(doc_id: int):
8 from myapp.models import Document
9 
10 doc = Document.objects.get(id=doc_id)
11 
12 client = OpenAI(
13 base_url=settings.MODELRIVER_BASE_URL,
14 api_key=settings.MODELRIVER_API_KEY,
15 )
16 
17 response = client.chat.completions.create(
18 model="my-summary-workflow",
19 messages=[
20 {"role": "system", "content": "请简明扼要地概括以下文档内容。"},
21 {"role": "user", "content": doc.content},
22 ],
23 )
24 
25 doc.summary = response.choices[0].message.content
26 doc.save()
27 return doc.summary
28 
29# 在视图中调用
30@csrf_exempt
31@require_POST
32def summarise_view(request, doc_id):
33 summarise_document.delay(doc_id)
34 return JsonResponse({"status": "processing"})

通过 Django Channels 实现流式响应 (Streaming with Django Channels)

PYTHON
1# consumers.py
2import json
3from channels.generic.websocket import AsyncWebsocketConsumer
4from openai import AsyncOpenAI
5from django.conf import settings
6 
7class ChatConsumer(AsyncWebsocketConsumer):
8 async def connect(self):
9 await self.accept()
10 self.messages = []
11 self.client = AsyncOpenAI(
12 base_url=settings.MODELRIVER_BASE_URL,
13 api_key=settings.MODELRIVER_API_KEY,
14 )
15 
16 async def receive(self, text_data=None):
17 data = json.loads(text_data)
18 self.messages.append({"role": "user", "content": data["message"]})
19 
20 stream = await self.client.chat.completions.create(
21 model="my-chat-workflow",
22 messages=self.messages,
23 stream=True,
24 )
25 
26 full_response = ""
27 async for chunk in stream:
28 content = chunk.choices[0].delta.content
29 if content:
30 full_response += content
31 await self.send(json.dumps({"type": "chunk", "content": content}))
32 
33 self.messages.append({"role": "assistant", "content": full_response})
34 await self.send(json.dumps({"type": "done"}))

最佳实践 (Best practices)

  1. 针对批处理任务务必使用 Celery: 诸如提炼总结、生成 embeddings 和深度分析这类的活儿都应该以异步方式在后台跑。
  2. 创建一个高度可复用的 AI 服务模块: 把 ModelRiver 相关的配置和调用统归并在一个单独的文件/模块中去中心化管理。
  3. 添加错误处理: 记得捕获由 openai.APIError 抛出的错误,并为前端用户反馈清晰的错误响应。
  4. 借助 Django Channels 运作流式场景: 标准的普通视图(Views)没办法很高效地承担起 SSE(服务器发送事件)推流重托。
  5. 在请求日志中监控: 利用 可观测性 (Observability) 控制台来洞察和探查每个视图函数所耗费的具体成本。

下一步探究