概述
ASP.NET Core 的 minimal APIs 和 System.Threading.Channels 为处理 ModelRiver 事件驱动 AI Webhooks 提供了一个高性能的管道。使用中间件进行签名验证,使用通道 (Channels) 进行后台处理,并使用 HttpClient 进行回调。
您将构建的内容:
- 一个用于接收 Webhooks 的 minimal API 端点
- HMAC 签名验证中间件
- 使用
IHostedService和Channel<T>进行后台处理 - 利用
HttpClient向 ModelRiver 发起回调
快速开始
创建项目
Bash
dotnet new web -n MyAiAppcd MyAiApp配置
JSON
1// appsettings.json2{3 "ModelRiver": {4 "ApiKey": "mr_live_YOUR_API_KEY",5 "WebhookSecret": "your_webhook_secret",6 "BaseUrl": "https://api.modelriver.com"7 }8}CSHARP
1// ModelRiverOptions.cs2public class ModelRiverOptions3{4 public string ApiKey { get; set; } = "";5 public string WebhookSecret { get; set; } = "";6 public string BaseUrl { get; set; } = "https://api.modelriver.com";7}Webhook 端点
CSHARP
1// Program.cs2using System.Security.Cryptography;3using System.Text;4using System.Text.Json;5using System.Threading.Channels;6 7var builder = WebApplication.CreateBuilder(args);8 9builder.Services.Configure<ModelRiverOptions>(10 builder.Configuration.GetSection("ModelRiver"));11 12builder.Services.AddHttpClient("ModelRiver");13 14// 后台处理通道15var channel = Channel.CreateUnbounded<WebhookJob>();16builder.Services.AddSingleton(channel);17builder.Services.AddHostedService<WebhookProcessor>();18 19var app = builder.Build();20 21app.MapPost("/webhooks/modelriver", async (22 HttpContext context,23 Channel<WebhookJob> jobChannel,24 IConfiguration config) =>25{26 // 1. 读取原始 Body27 using var reader = new StreamReader(context.Request.Body);28 var rawBody = await reader.ReadToEndAsync();29 30 // 2. 验证签名31 var signature = context.Request.Headers["mr-signature"].FirstOrDefault() ?? "";32 var secret = config["ModelRiver:WebhookSecret"] ?? "";33 34 using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));35 var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(rawBody));36 var expected = Convert.ToHexString(hash).ToLower();37 38 if (!CryptographicOperations.FixedTimeEquals(39 Encoding.UTF8.GetBytes(expected),40 Encoding.UTF8.GetBytes(signature)))41 {42 return Results.Unauthorized();43 }44 45 // 3. 解析负载46 var payload = JsonSerializer.Deserialize<WebhookPayload>(rawBody,47 new JsonSerializerOptions { PropertyNameCaseInsensitive = true });48 49 if (payload is null)50 return Results.BadRequest();51 52 // 4. 处理事件驱动的工作流53 if (payload.Type == "task.ai_generated" && payload.CallbackUrl is not null)54 {55 await jobChannel.Writer.WriteAsync(new WebhookJob56 {57 Event = payload.Event ?? "",58 AiResponse = payload.AiResponse,59 CallbackUrl = payload.CallbackUrl,60 CustomerData = payload.CustomerData ?? new(),61 });62 63 return Results.Ok(new { received = true });64 }65 66 return Results.Ok(new { received = true });67});68 69app.Run();70 71// --- 模型 ---72 73public record WebhookPayload74{75 public string Type { get; init; } = "";76 public string? Event { get; init; }77 public string ChannelId { get; init; } = "";78 public AiResponseData? AiResponse { get; init; }79 public string? CallbackUrl { get; init; }80 public bool? CallbackRequired { get; init; }81 public Dictionary<string, object>? CustomerData { get; init; }82 public string? Timestamp { get; init; }83}84 85public record AiResponseData86{87 public Dictionary<string, object> Data { get; init; } = new();88}89 90public record WebhookJob91{92 public string Event { get; init; } = "";93 public AiResponseData? AiResponse { get; init; }94 public string CallbackUrl { get; init; } = "";95 public Dictionary<string, object> CustomerData { get; init; } = new();96}后台处理器
CSHARP
1// WebhookProcessor.cs2using System.Text.Json;3using System.Threading.Channels;4using Microsoft.Extensions.Options;5 6public class WebhookProcessor : BackgroundService7{8 private readonly Channel<WebhookJob> _channel;9 private readonly IHttpClientFactory _httpClientFactory;10 private readonly ModelRiverOptions _options;11 private readonly ILogger<WebhookProcessor> _logger;12 13 public WebhookProcessor(14 Channel<WebhookJob> channel,15 IHttpClientFactory httpClientFactory,16 IOptions<ModelRiverOptions> options,17 ILogger<WebhookProcessor> logger)18 {19 _channel = channel;20 _httpClientFactory = httpClientFactory;21 _options = options.Value;22 _logger = logger;23 }24 25 protected override async Task ExecuteAsync(CancellationToken stoppingToken)26 {27 await foreach (var job in _channel.Reader.ReadAllAsync(stoppingToken))28 {29 try30 {31 await ProcessJobAsync(job, stoppingToken);32 }33 catch (Exception ex)34 {35 _logger.LogError(ex, "为事件处理 Webhook 任务失败: {Event}", job.Event);36 }37 }38 }39 40 private async Task ProcessJobAsync(WebhookJob job, CancellationToken ct)41 {42 var client = _httpClientFactory.CreateClient("ModelRiver");43 client.DefaultRequestHeaders.Add("Authorization", $"Bearer {_options.ApiKey}");44 45 var enrichedData = job.AiResponse?.Data ?? new Dictionary<string, object>();46 47 // 您的自定义业务逻辑48 if (job.Event == "content_ready")49 {50 enrichedData["processed"] = true;51 enrichedData["saved_at"] = DateTime.UtcNow.ToString("O");52 }53 54 // 回调 ModelRiver55 var callback = new56 {57 data = enrichedData,58 task_id = $"dotnet_{job.Event}_{DateTimeOffset.UtcNow.ToUnixTimeSeconds()}",59 metadata = new60 {61 processed_by = "dotnet",62 processed_at = DateTime.UtcNow.ToString("O"),63 },64 };65 66 var content = new StringContent(67 JsonSerializer.Serialize(callback),68 System.Text.Encoding.UTF8,69 "application/json");70 71 var response = await client.PostAsync(job.CallbackUrl, content, ct);72 73 if (response.IsSuccessStatusCode)74 {75 _logger.LogInformation("✅ 已针对事件发送回调: {Event}", job.Event);76 }77 else78 {79 _logger.LogError("❌ 回调失败 ({Status})", response.StatusCode);80 81 // 发送错误回调82 var errorContent = new StringContent(83 JsonSerializer.Serialize(new { error = "processing_failed", message = $"HTTP {response.StatusCode}" }),84 System.Text.Encoding.UTF8,85 "application/json");86 87 await client.PostAsync(job.CallbackUrl, errorContent, ct);88 }89 }90}触发异步请求
CSHARP
1// ModelRiverService.cs2public class ModelRiverService3{4 private readonly HttpClient _client;5 private readonly ModelRiverOptions _options;6 7 public ModelRiverService(IHttpClientFactory factory, IOptions<ModelRiverOptions> options)8 {9 _client = factory.CreateClient("ModelRiver");10 _options = options.Value;11 _client.DefaultRequestHeaders.Add("Authorization", $"Bearer {_options.ApiKey}");12 }13 14 public async Task<AsyncAiResponse> TriggerAsync(string workflow, string prompt)15 {16 var response = await _client.PostAsJsonAsync($"{_options.BaseUrl}/v1/ai/async", new17 {18 workflow,19 messages = new[] { new { role = "user", content = prompt } },20 });21 22 response.EnsureSuccessStatusCode();23 return await response.Content.ReadFromJsonAsync<AsyncAiResponse>()24 ?? throw new Exception("解析响应失败");25 }26}27 28public record AsyncAiResponse(29 string ChannelId,30 string WsToken,31 string WebsocketUrl,32 string WebsocketChannel33);最佳实践
- 使用
Channel<T>:用于后台处理的高性能进程内队列。 - 使用
CryptographicOperations.FixedTimeEquals:用于签名的恒定时间比较。 - 使用
IHttpClientFactory:适当的连接池和生命周期管理。 - 使用
IHostedService:随应用程序启动的长运行后台处理器。 - 读取原始 Body:在模型绑定之前读取流以进行签名验证。
下一步
- 返回 后端框架:所有框架指南
- Webhooks 参考:重试策略和投递监控
- 事件驱动 AI 概述:架构和流程