Laravel 的事件驱动 AI

在 Laravel 控制器中接收 AI Webhooks,使用队列任务进行处理,并利用 Laravel 的队列和事件系统回调 ModelRiver。

概述

Laravel 强大的队列系统、中间件管道和 HTTP 客户端逻辑使其成为处理 ModelRiver 事件驱动 AI Webhooks 的理想选择。您可以将繁重的处理逻辑分配给后台任务,同时立即对 Webhook 做出响应。

您将构建的内容:

  • 一个带有签名验证中间件的 Webhook 控制器
  • 用于异步处理和回调的队列任务 (Queued jobs)
  • 用于 ModelRiver 回调的 HTTP 客户端集成
  • 具有可扩展性的事件/监听器模式

快速开始

安装依赖

Bash
composer create-project laravel/laravel my-ai-app
cd my-ai-app

环境变量

Bash
# .env
MODELRIVER_API_KEY=mr_live_YOUR_API_KEY
MODELRIVER_WEBHOOK_SECRET=your_webhook_secret

配置

PHP
1// config/services.php
2return [
3 // ...
4 'modelriver' => [
5 'api_key' => env('MODELRIVER_API_KEY'),
6 'webhook_secret' => env('MODELRIVER_WEBHOOK_SECRET'),
7 'base_url' => 'https://api.modelriver.com',
8 ],
9];

签名验证中间件

PHP
1// app/Http/Middleware/VerifyModelRiverSignature.php
2<?php
3 
4namespace App\Http\Middleware;
5 
6use Closure;
7use Illuminate\Http\Request;
8 
9class VerifyModelRiverSignature
10{
11 public function handle(Request $request, Closure $next)
12 {
13 $signature = $request->header('mr-signature', '');
14 $secret = config('services.modelriver.webhook_secret');
15 
16 $expected = hash_hmac('sha256', $request->getContent(), $secret);
17 
18 if (!hash_equals($expected, $signature)) {
19 return response()->json(['error' => 'Invalid signature'], 401);
20 }
21 
22 return $next($request);
23 }
24}

注册中间件:

PHP
1// bootstrap/app.php (Laravel 11+)
2->withMiddleware(function (Middleware $middleware) {
3 $middleware->alias([
4 'verify.modelriver' => \App\Http\Middleware\VerifyModelRiverSignature::class,
5 ]);
6})

Webhook 控制器

PHP
1// app/Http/Controllers/ModelRiverWebhookController.php
2<?php
3 
4namespace App\Http\Controllers;
5 
6use App\Jobs\ProcessAiWebhook;
7use Illuminate\Http\Request;
8use Illuminate\Http\JsonResponse;
9 
10class ModelRiverWebhookController extends Controller
11{
12 public function handle(Request $request): JsonResponse
13 {
14 $payload = $request->all();
15 $type = $payload['type'] ?? '';
16 $callbackUrl = $payload['callback_url'] ?? null;
17 
18 // 处理事件驱动的工作流
19 if ($type === 'task.ai_generated' && $callbackUrl) {
20 ProcessAiWebhook::dispatch(
21 event: $payload['event'] ?? '',
22 aiResponse: $payload['ai_response'] ?? [],
23 callbackUrl: $callbackUrl,
24 customerData: $payload['customer_data'] ?? [],
25 );
26 
27 return response()->json(['received' => true]);
28 }
29 
30 // 标准 Webhook
31 logger()->info('收到标准 Webhook', ['type' => $type]);
32 return response()->json(['received' => true]);
33 }
34}

路由

PHP
1// routes/api.php
2use App\Http\Controllers\ModelRiverWebhookController;
3 
4Route::post('/webhooks/modelriver', [ModelRiverWebhookController::class, 'handle'])
5 ->middleware('verify.modelriver');

队列任务 (Queued job)

PHP
1// app/Jobs/ProcessAiWebhook.php
2<?php
3 
4namespace App\Jobs;
5 
6use Illuminate\Bus\Queueable;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Bus\Dispatchable;
9use Illuminate\Queue\InteractsWithQueue;
10use Illuminate\Queue\SerializesModels;
11use Illuminate\Support\Facades\Http;
12use Illuminate\Support\Facades\Log;
13 
14class ProcessAiWebhook implements ShouldQueue
15{
16 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
17 
18 public int $tries = 3;
19 public int $backoff = 10;
20 
21 public function __construct(
22 public string $event,
23 public array $aiResponse,
24 public string $callbackUrl,
25 public array $customerData,
26 ) {}
27 
28 public function handle(): void
29 {
30 try {
31 $enrichedData = $this->aiResponse['data'] ?? [];
32 
33 // 您的自定义业务逻辑
34 if ($this->event === 'content_ready') {
35 $content = \App\Models\Content::create([
36 'title' => $enrichedData['title'] ?? '',
37 'body' => $enrichedData['description'] ?? '',
38 'category' => $this->customerData['category'] ?? 'general',
39 ]);
40 
41 $enrichedData['id'] = $content->id;
42 $enrichedData['slug'] = $content->slug;
43 $enrichedData['saved_at'] = now()->toISOString();
44 }
45 
46 if ($this->event === 'review_complete') {
47 // 调用外部服务
48 $enrichedData['reviewed'] = true;
49 $enrichedData['reviewed_at'] = now()->toISOString();
50 }
51 
52 // 回调 ModelRiver
53 $response = Http::withHeaders([
54 'Authorization' => 'Bearer ' . config('services.modelriver.api_key'),
55 ])->timeout(10)->post($this->callbackUrl, [
56 'data' => $enrichedData,
57 'task_id' => "laravel_{$this->event}_" . now()->timestamp,
58 'metadata' => [
59 'processed_by' => 'laravel',
60 'processed_at' => now()->toISOString(),
61 ],
62 ]);
63 
64 $response->throw();
65 Log::info("✅ 已针对事件发送回调: {$this->event}");
66 
67 } catch (\Exception $e) {
68 Log::error("❌ 回调失败: {$e->getMessage()}");
69 
70 // 发送错误回调
71 Http::withHeaders([
72 'Authorization' => 'Bearer ' . config('services.modelriver.api_key'),
73 ])->timeout(10)->post($this->callbackUrl, [
74 'error' => 'processing_failed',
75 'message' => $e->getMessage(),
76 ]);
77 
78 throw $e;
79 }
80 }
81}

触发异步请求

PHP
1// app/Services/ModelRiverService.php
2<?php
3 
4namespace App\Services;
5 
6use Illuminate\Support\Facades\Http;
7 
8class ModelRiverService
9{
10 public function triggerAsync(string $workflow, string $prompt, array $metadata = []): array
11 {
12 $response = Http::withHeaders([
13 'Authorization' => 'Bearer ' . config('services.modelriver.api_key'),
14 ])->post(config('services.modelriver.base_url') . '/v1/ai/async', [
15 'workflow' => $workflow,
16 'messages' => [
17 ['role' => 'user', 'content' => $prompt],
18 ],
19 'metadata' => $metadata,
20 ]);
21 
22 $response->throw();
23 return $response->json();
24 }
25}
26 
27// 在控制器中使用
28public function generate(Request $request, ModelRiverService $modelriver)
29{
30 $result = $modelriver->triggerAsync(
31 workflow: 'content_generator',
32 prompt: $request->input('prompt'),
33 metadata: ['user_id' => auth()->id()],
34 );
35 
36 return response()->json([
37 'channel_id' => $result['channel_id'],
38 'ws_token' => $result['ws_token'],
39 'websocket_channel' => $result['websocket_channel'],
40 ]);
41}

最佳实践

  1. 使用队列任务:绝不要让繁重的处理逻辑阻塞 Webhook 响应。
  2. 设置 $tries$backoff:通过重试处理瞬时故障。
  3. 使用 Laravel 的 HTTP 客户端:内置重试、超时和错误处理。
  4. 注册中间件:在调用控制器之前验证签名。
  5. 使用 Laravel 事件:分发事件以增强可扩展性(如 WebhookReceived, CallbackSent)。

下一步