.NET 的事件驱动 AI

在 ASP.NET Core minimal APIs 中接收 AI Webhooks,使用托管服务或通道进行处理,并利用 HttpClient 回调 ModelRiver。

概述

ASP.NET Core 的 minimal APIs 和 System.Threading.Channels 为处理 ModelRiver 事件驱动 AI Webhooks 提供了一个高性能的管道。使用中间件进行签名验证,使用通道 (Channels) 进行后台处理,并使用 HttpClient 进行回调。

您将构建的内容:

  • 一个用于接收 Webhooks 的 minimal API 端点
  • HMAC 签名验证中间件
  • 使用 IHostedServiceChannel<T> 进行后台处理
  • 利用 HttpClient 向 ModelRiver 发起回调

快速开始

创建项目

Bash
dotnet new web -n MyAiApp
cd MyAiApp

配置

JSON
1// appsettings.json
2{
3 "ModelRiver": {
4 "ApiKey": "mr_live_YOUR_API_KEY",
5 "WebhookSecret": "your_webhook_secret",
6 "BaseUrl": "https://api.modelriver.com"
7 }
8}
CSHARP
1// ModelRiverOptions.cs
2public class ModelRiverOptions
3{
4 public string ApiKey { get; set; } = "";
5 public string WebhookSecret { get; set; } = "";
6 public string BaseUrl { get; set; } = "https://api.modelriver.com";
7}

Webhook 端点

CSHARP
1// Program.cs
2using System.Security.Cryptography;
3using System.Text;
4using System.Text.Json;
5using System.Threading.Channels;
6 
7var builder = WebApplication.CreateBuilder(args);
8 
9builder.Services.Configure<ModelRiverOptions>(
10 builder.Configuration.GetSection("ModelRiver"));
11 
12builder.Services.AddHttpClient("ModelRiver");
13 
14// 后台处理通道
15var channel = Channel.CreateUnbounded<WebhookJob>();
16builder.Services.AddSingleton(channel);
17builder.Services.AddHostedService<WebhookProcessor>();
18 
19var app = builder.Build();
20 
21app.MapPost("/webhooks/modelriver", async (
22 HttpContext context,
23 Channel<WebhookJob> jobChannel,
24 IConfiguration config) =>
25{
26 // 1. 读取原始 Body
27 using var reader = new StreamReader(context.Request.Body);
28 var rawBody = await reader.ReadToEndAsync();
29 
30 // 2. 验证签名
31 var signature = context.Request.Headers["mr-signature"].FirstOrDefault() ?? "";
32 var secret = config["ModelRiver:WebhookSecret"] ?? "";
33 
34 using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));
35 var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(rawBody));
36 var expected = Convert.ToHexString(hash).ToLower();
37 
38 if (!CryptographicOperations.FixedTimeEquals(
39 Encoding.UTF8.GetBytes(expected),
40 Encoding.UTF8.GetBytes(signature)))
41 {
42 return Results.Unauthorized();
43 }
44 
45 // 3. 解析负载
46 var payload = JsonSerializer.Deserialize<WebhookPayload>(rawBody,
47 new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
48 
49 if (payload is null)
50 return Results.BadRequest();
51 
52 // 4. 处理事件驱动的工作流
53 if (payload.Type == "task.ai_generated" && payload.CallbackUrl is not null)
54 {
55 await jobChannel.Writer.WriteAsync(new WebhookJob
56 {
57 Event = payload.Event ?? "",
58 AiResponse = payload.AiResponse,
59 CallbackUrl = payload.CallbackUrl,
60 CustomerData = payload.CustomerData ?? new(),
61 });
62 
63 return Results.Ok(new { received = true });
64 }
65 
66 return Results.Ok(new { received = true });
67});
68 
69app.Run();
70 
71// --- 模型 ---
72 
73public record WebhookPayload
74{
75 public string Type { get; init; } = "";
76 public string? Event { get; init; }
77 public string ChannelId { get; init; } = "";
78 public AiResponseData? AiResponse { get; init; }
79 public string? CallbackUrl { get; init; }
80 public bool? CallbackRequired { get; init; }
81 public Dictionary<string, object>? CustomerData { get; init; }
82 public string? Timestamp { get; init; }
83}
84 
85public record AiResponseData
86{
87 public Dictionary<string, object> Data { get; init; } = new();
88}
89 
90public record WebhookJob
91{
92 public string Event { get; init; } = "";
93 public AiResponseData? AiResponse { get; init; }
94 public string CallbackUrl { get; init; } = "";
95 public Dictionary<string, object> CustomerData { get; init; } = new();
96}

后台处理器

CSHARP
1// WebhookProcessor.cs
2using System.Text.Json;
3using System.Threading.Channels;
4using Microsoft.Extensions.Options;
5 
6public class WebhookProcessor : BackgroundService
7{
8 private readonly Channel<WebhookJob> _channel;
9 private readonly IHttpClientFactory _httpClientFactory;
10 private readonly ModelRiverOptions _options;
11 private readonly ILogger<WebhookProcessor> _logger;
12 
13 public WebhookProcessor(
14 Channel<WebhookJob> channel,
15 IHttpClientFactory httpClientFactory,
16 IOptions<ModelRiverOptions> options,
17 ILogger<WebhookProcessor> logger)
18 {
19 _channel = channel;
20 _httpClientFactory = httpClientFactory;
21 _options = options.Value;
22 _logger = logger;
23 }
24 
25 protected override async Task ExecuteAsync(CancellationToken stoppingToken)
26 {
27 await foreach (var job in _channel.Reader.ReadAllAsync(stoppingToken))
28 {
29 try
30 {
31 await ProcessJobAsync(job, stoppingToken);
32 }
33 catch (Exception ex)
34 {
35 _logger.LogError(ex, "为事件处理 Webhook 任务失败: {Event}", job.Event);
36 }
37 }
38 }
39 
40 private async Task ProcessJobAsync(WebhookJob job, CancellationToken ct)
41 {
42 var client = _httpClientFactory.CreateClient("ModelRiver");
43 client.DefaultRequestHeaders.Add("Authorization", $"Bearer {_options.ApiKey}");
44 
45 var enrichedData = job.AiResponse?.Data ?? new Dictionary<string, object>();
46 
47 // 您的自定义业务逻辑
48 if (job.Event == "content_ready")
49 {
50 enrichedData["processed"] = true;
51 enrichedData["saved_at"] = DateTime.UtcNow.ToString("O");
52 }
53 
54 // 回调 ModelRiver
55 var callback = new
56 {
57 data = enrichedData,
58 task_id = $"dotnet_{job.Event}_{DateTimeOffset.UtcNow.ToUnixTimeSeconds()}",
59 metadata = new
60 {
61 processed_by = "dotnet",
62 processed_at = DateTime.UtcNow.ToString("O"),
63 },
64 };
65 
66 var content = new StringContent(
67 JsonSerializer.Serialize(callback),
68 System.Text.Encoding.UTF8,
69 "application/json");
70 
71 var response = await client.PostAsync(job.CallbackUrl, content, ct);
72 
73 if (response.IsSuccessStatusCode)
74 {
75 _logger.LogInformation("✅ 已针对事件发送回调: {Event}", job.Event);
76 }
77 else
78 {
79 _logger.LogError("❌ 回调失败 ({Status})", response.StatusCode);
80 
81 // 发送错误回调
82 var errorContent = new StringContent(
83 JsonSerializer.Serialize(new { error = "processing_failed", message = $"HTTP {response.StatusCode}" }),
84 System.Text.Encoding.UTF8,
85 "application/json");
86 
87 await client.PostAsync(job.CallbackUrl, errorContent, ct);
88 }
89 }
90}

触发异步请求

CSHARP
1// ModelRiverService.cs
2public class ModelRiverService
3{
4 private readonly HttpClient _client;
5 private readonly ModelRiverOptions _options;
6 
7 public ModelRiverService(IHttpClientFactory factory, IOptions<ModelRiverOptions> options)
8 {
9 _client = factory.CreateClient("ModelRiver");
10 _options = options.Value;
11 _client.DefaultRequestHeaders.Add("Authorization", $"Bearer {_options.ApiKey}");
12 }
13 
14 public async Task<AsyncAiResponse> TriggerAsync(string workflow, string prompt)
15 {
16 var response = await _client.PostAsJsonAsync($"{_options.BaseUrl}/v1/ai/async", new
17 {
18 workflow,
19 messages = new[] { new { role = "user", content = prompt } },
20 });
21 
22 response.EnsureSuccessStatusCode();
23 return await response.Content.ReadFromJsonAsync<AsyncAiResponse>()
24 ?? throw new Exception("解析响应失败");
25 }
26}
27 
28public record AsyncAiResponse(
29 string ChannelId,
30 string WsToken,
31 string WebsocketUrl,
32 string WebsocketChannel
33);

最佳实践

  1. 使用 Channel<T>:用于后台处理的高性能进程内队列。
  2. 使用 CryptographicOperations.FixedTimeEquals:用于签名的恒定时间比较。
  3. 使用 IHttpClientFactory:适当的连接池和生命周期管理。
  4. 使用 IHostedService:随应用程序启动的长运行后台处理器。
  5. 读取原始 Body:在模型绑定之前读取流以进行签名验证。

下一步