概述
Spring Boot 的企业级特性(如依赖注入、异步处理和 WebClient)为大规模处理 ModelRiver 事件驱动 AI Webhooks 提供了稳健的基础。
您将构建的内容:
- 一个用于接收 Webhooks 的 REST 控制器端点
- HMAC 签名验证过滤器 (Filter)
- 用于后台处理的异步服务
- 用于非阻塞回调的响应式 WebClient
快速开始
创建项目
使用 Spring Initializr,并选择:
- 依赖项: Spring Web, Spring WebFlux (用于 WebClient)
- Java 版本: 17+
配置
YAML
1# application.yml2modelriver:3 api-key: ${MODELRIVER_API_KEY}4 webhook-secret: ${MODELRIVER_WEBHOOK_SECRET}5 base-url: https://api.modelriver.comJAVA
1// ModelRiverProperties.java2@ConfigurationProperties(prefix = "modelriver")3public record ModelRiverProperties(4 String apiKey,5 String webhookSecret,6 String baseUrl7) {}签名验证过滤器 (Filter)
JAVA
1// WebhookSignatureFilter.java2import jakarta.servlet.*;3import jakarta.servlet.http.*;4import org.springframework.stereotype.Component;5import org.springframework.web.util.ContentCachingRequestWrapper;6import javax.crypto.Mac;7import javax.crypto.spec.SecretKeySpec;8import java.io.IOException;9import java.security.MessageDigest;10 11@Component12public class WebhookSignatureFilter implements Filter {13 14 private final ModelRiverProperties properties;15 16 public WebhookSignatureFilter(ModelRiverProperties properties) {17 this.properties = properties;18 }19 20 @Override21 public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)22 throws IOException, ServletException {23 HttpServletRequest request = (HttpServletRequest) req;24 HttpServletResponse response = (HttpServletResponse) res;25 26 // 仅应用于 Webhook 端点27 if (!request.getRequestURI().startsWith("/webhooks/modelriver")) {28 chain.doFilter(req, res);29 return;30 }31 32 ContentCachingRequestWrapper wrappedRequest = new ContentCachingRequestWrapper(request);33 chain.doFilter(wrappedRequest, res);34 }35 36 public boolean verifySignature(String payload, String signature) {37 try {38 Mac mac = Mac.getInstance("HmacSHA256");39 SecretKeySpec key = new SecretKeySpec(40 properties.webhookSecret().getBytes(), "HmacSHA256"41 );42 mac.init(key);43 byte[] hash = mac.doFinal(payload.getBytes());44 45 String expected = bytesToHex(hash);46 return MessageDigest.isEqual(47 expected.getBytes(),48 signature.getBytes()49 );50 } catch (Exception e) {51 return false;52 }53 }54 55 private String bytesToHex(byte[] bytes) {56 StringBuilder sb = new StringBuilder();57 for (byte b : bytes) {58 sb.append(String.format("%02x", b));59 }60 return sb.toString();61 }62}Webhook 控制器
JAVA
1// WebhookController.java2import org.springframework.http.ResponseEntity;3import org.springframework.web.bind.annotation.*;4import java.util.Map;5 6@RestController7@RequestMapping("/webhooks")8public class WebhookController {9 10 private final WebhookSignatureFilter signatureFilter;11 private final AiWebhookProcessor processor;12 13 public WebhookController(14 WebhookSignatureFilter signatureFilter,15 AiWebhookProcessor processor16 ) {17 this.signatureFilter = signatureFilter;18 this.processor = processor;19 }20 21 @PostMapping("/modelriver")22 public ResponseEntity<Map<String, Boolean>> handleWebhook(23 @RequestBody String rawBody,24 @RequestHeader(value = "mr-signature", defaultValue = "") String signature25 ) {26 // 1. 验证签名27 if (!signatureFilter.verifySignature(rawBody, signature)) {28 return ResponseEntity.status(401).build();29 }30 31 // 2. 解析负载32 Map<String, Object> payload = parseJson(rawBody);33 String type = (String) payload.getOrDefault("type", "");34 String callbackUrl = (String) payload.get("callback_url");35 36 // 3. 处理事件驱动的工作流37 if ("task.ai_generated".equals(type) && callbackUrl != null) {38 processor.processAsync(39 (String) payload.get("event"),40 (Map<String, Object>) payload.get("ai_response"),41 callbackUrl,42 (Map<String, Object>) payload.getOrDefault("customer_data", Map.of())43 );44 return ResponseEntity.ok(Map.of("received", true));45 }46 47 return ResponseEntity.ok(Map.of("received", true));48 }49 50 @SuppressWarnings("unchecked")51 private Map<String, Object> parseJson(String json) {52 try {53 return new com.fasterxml.jackson.databind.ObjectMapper()54 .readValue(json, Map.class);55 } catch (Exception e) {56 return Map.of();57 }58 }59}异步处理器服务
JAVA
1// AiWebhookProcessor.java2import org.springframework.scheduling.annotation.Async;3import org.springframework.stereotype.Service;4import org.springframework.web.reactive.function.client.WebClient;5import org.slf4j.Logger;6import org.slf4j.LoggerFactory;7import java.time.Instant;8import java.util.HashMap;9import java.util.Map;10 11@Service12public class AiWebhookProcessor {13 14 private static final Logger log = LoggerFactory.getLogger(AiWebhookProcessor.class);15 private final WebClient webClient;16 private final ModelRiverProperties properties;17 18 public AiWebhookProcessor(ModelRiverProperties properties) {19 this.properties = properties;20 this.webClient = WebClient.builder()21 .defaultHeader("Content-Type", "application/json")22 .build();23 }24 25 @Async26 public void processAsync(27 String event,28 Map<String, Object> aiResponse,29 String callbackUrl,30 Map<String, Object> customerData31 ) {32 try {33 Map<String, Object> data = aiResponse != null34 ? new HashMap<>((Map<String, Object>) aiResponse.getOrDefault("data", Map.of()))35 : new HashMap<>();36 37 // 您的自定义业务逻辑38 if ("content_ready".equals(event)) {39 data.put("processed", true);40 data.put("saved_at", Instant.now().toString());41 }42 43 // 回调 ModelRiver44 Map<String, Object> callback = Map.of(45 "data", data,46 "task_id", "springboot_" + event + "_" + Instant.now().getEpochSecond(),47 "metadata", Map.of(48 "processed_by", "spring-boot",49 "processed_at", Instant.now().toString()50 )51 );52 53 webClient.post()54 .uri(callbackUrl)55 .header("Authorization", "Bearer " + properties.apiKey())56 .bodyValue(callback)57 .retrieve()58 .toBodilessEntity()59 .block();60 61 log.info("✅ 已针对事件发送回调: {}", event);62 63 } catch (Exception e) {64 log.error("❌ 回调失败: {}", e.getMessage());65 66 // 发送错误回调67 try {68 webClient.post()69 .uri(callbackUrl)70 .header("Authorization", "Bearer " + properties.apiKey())71 .bodyValue(Map.of(72 "error", "processing_failed",73 "message", e.getMessage()74 ))75 .retrieve()76 .toBodilessEntity()77 .block();78 } catch (Exception ignored) {}79 }80 }81}启用异步处理:
JAVA
1// Application.java2@SpringBootApplication3@EnableAsync4@EnableConfigurationProperties(ModelRiverProperties.class)5public class MyAiAppApplication {6 public static void main(String[] args) {7 SpringApplication.run(MyAiAppApplication.class, args);8 }9}最佳实践
- 使用
@Async:在独立的线程池中处理 Webhooks,以避免阻塞。 - 使用
WebClient:用于回调的非阻塞 HTTP 客户端。 - 使用
@ConfigurationProperties:类型安全的配置管理。 - 使用
MessageDigest.isEqual:用于签名验证的恒定时间比较。 - 配置线程池:针对生产环境负载设置
spring.task.execution.pool.core-size。
下一步
- .NET 事件驱动指南:C# 备选方案
- Webhooks 参考:重试策略和投递监控
- 事件驱动 AI 概述:架构和流程