限流和错误处理
限流与错误处理:全局冷却、智能退避与可观测性闭环
在生产环境中,大模型调用失败的原因通常不是单点的“请求参数错误”,而是由限流、配额、网络波动、上游网关不稳定等因素叠加造成。尤其在并发场景下,如果缺少统一的限流与重试策略,系统很容易出现两类问题:
失败放大 多个并发请求同时遇到 429 后各自重试,进一步触发更严重的限流,形成雪崩。
排障困难 没有记录真实请求与响应,告警只剩下异常堆栈,无法快速判断是配额问题、协议问题还是上游不可用。
本节给出一套可直接落地的解决方案,核心由两部分组成:
ApiCooldownManager:全局冷却管理器,用于跨线程抑制限流风暴UniPredictService:统一调用入口,集成重试、分级告警、智能退避、落库与代理策略
同时保证与前文的请求记录体系一致:
- 成功调用保存 usage:
Aop.get(MvLlmUsageDao.class).saveUsage(...) - 失败调用保存 request/response:
Aop.get(MvLlmGenerateFailedDao.class).save(...)
1. 设计目标
一套稳定的限流与错误处理体系应该满足:
- 对 429 具备全局视角,不让并发请求各自盲目重试
- 对不同错误码区分对待,避免无意义重试
- 遇到上游建议的重试时间时能够自动遵守,减少试探成本
- 所有成功与失败都能落库,形成可观测性闭环
- 告警信息携带必要上下文,但避免发送超大请求体导致告警系统压力
2. 全局冷却管理器 ApiCooldownManager
ApiCooldownManager 的职责很明确:为某个服务维度(通常是 apiKey 或 providerKey)维护一个“冷却截止时间”。
2.1 核心机制
recordCooldown(serviceKey, delay)记录冷却窗口,把“允许再次请求的时间点”写入内存表,并用merge(Math::max)保证只会延长,不会被较短窗口覆盖。enforceCooldown(serviceKey)在每一次请求前检查冷却窗口,若仍处于冷却中则阻塞当前线程直到冷却结束。
2.2 为什么需要 COOLDOWN_BUFFER_MS
很多平台给出的 retryDelay 是估算值,网络延迟、时钟偏差、队列抖动都可能导致“刚到时间点仍然被限流”。加一个小 buffer 可以降低边界抖动带来的二次 429。
2.3 serviceKey 的选择策略
你当前实现使用 uniChatRequest.getApiKey() 作为 serviceKey,优点是:
- 与平台配额/限流强相关
- 能显著降低同一 key 下的并发风暴
可选的增强策略是:platform + ":" + apiKey,防止同一 key 在不同平台之间互相影响。但为了最少改动,你的实现已经足够实用。
3. 统一入口 UniPredictService:重试、退避与分级处理
UniPredictService 把所有复杂度集中在一个点上,让业务侧只做两件事:
- 构造
UniChatRequest - 调用
uniPredictService.generate(request)
3.1 调用前置:环境与代理策略
applyChinaProxyIfNeeded(uniChatRequest);
这段逻辑将“环境差异”从业务逻辑中剥离:
- 境内 + OpenRouter → 自动设置代理前缀
- 业务不需要写 if/else 判断网络环境
这一点对多环境部署非常重要,否则每个业务服务都会开始复制相同的代理配置逻辑。
4. 失败分类:不要把所有错误都当成可重试
在大模型调用场景中,正确的做法不是“失败就重试”,而是先判断错误类型。
4.1 403:权限与配额策略错误
if (ModelPlatformName.GOOGLE.equals(req.getPlatform()) && code == 403) {
throw e;
}
这类错误通常意味着:
- key 没权限
- 账号策略禁止
- 服务未开通
- 组织策略限制
继续重试没有任何意义,反而会浪费资源,制造更多告警。因此直接抛出,让上层处理更合适。
4.2 400:余额不足等业务性不可恢复错误
你对 400 做了“消息识别”:
- 如果命中 “Your credit balance is too low …” 直接停止重试,并发出告警(不带大请求体)
这是非常实用的策略:把“不可恢复错误”从重试队列中剔除,避免对同一错误做重复尝试。
4.3 502:上游网关不稳定类错误
你对 502 的告警选择“不带 requestJson”,理由合理:
- 502 多为上游网关或中间层异常
- 请求体通常不是问题根源
- 不携带大体积 request 可以降低告警压力与数据泄露风险
5. 429:限流的关键处理路径
429 是最需要精细化处理的错误码,主要目标是防止并发风暴。
你的实现由三个步骤组成:
- 请求前执行全局冷却
- 解析上游建议等待时间
- 更新全局冷却并对当前请求退避
5.1 请求前冷却:enforceCooldown
apiCooldownManager.enforceCooldown(serviceKey);
这一步解决了最重要的问题:当某个 apiKey 已经处于限流窗口时,后续请求不会继续冲击上游。
5.2 从 Gemini 错误中提取 retryDelay
Gemini 的 429 响应中,通常会携带 google.rpc.RetryInfo,你的实现会扫描:
error.details[]- 找到
@type = type.googleapis.com/google.rpc.RetryInfo - 提取
retryDelay
这一点比固定退避更优,因为它尊重上游限流恢复节奏。
注意:你当前 parseRetryDelayToMillis 用正则提取数字并按秒处理,能够覆盖常见的 3s、10s 这种格式。若未来 Gemini 返回更复杂的 duration 格式,可以再增强解析逻辑,但当前实现对生产已经很有价值。
5.3 更新全局冷却:recordCooldown
apiCooldownManager.recordCooldown(serviceKey, retryDelayMillis);
这一步是“跨线程的关键动作”。如果只有当前线程 sleep,而没有写入全局冷却,那么其他并发请求仍会继续打上游,形成浪涌。
6. 退避策略:默认退避与智能退避的组合
你的退避策略是“分场景选择延迟”:
- Gemini + 429:使用解析出的 retryDelay(解析失败则回落默认值)
- 其他错误:使用默认 30 秒
这是一种偏稳健的策略,适合生产早期快速稳定系统。
6.1 为什么不直接指数退避
指数退避(例如 1s, 2s, 4s, 8s)在很多场景很有效,但对上游明确给出 retryDelay 的系统来说,盲目指数退避反而可能:
- 退避不足,继续撞限流
- 退避过度,降低吞吐
你当前的选择更符合“遵守上游建议、其他情况稳定退避”的原则。后续如果需要进一步优化,可以在“默认退避”分支引入指数退避与抖动,但并不是必须。
7. 可中断性:避免线程被无限阻塞
你的实现对两种 sleep 都做了中断处理:
- enforceCooldown 触发的 sleep
- 本次重试的 backoff sleep
一旦线程被 interrupt:
- 立即恢复中断标志
- 抛出 RuntimeException 结束流程
这很重要,因为在容器关闭、线程池回收、请求取消时,中断是释放资源的标准信号。忽略中断会导致线程在 sleep 中卡住,影响系统关停与资源回收。
8. 与请求记录系统的结合
这一套限流与错误处理之所以强,是因为它与请求记录体系组合后形成闭环:
8.1 成功路径
- 记录耗时 elapsed
- 记录 usage(token 等)
- 记录输出 content(可选脱敏)
Aop.get(MvLlmUsageDao.class).saveUsage(uniChatRequest, uniChatResponse, elapsed);
这可以支撑:
- 成本统计
- 模型效果抽检
- 延迟指标与 SLA
8.2 失败路径
- 保存最终请求 URL
- 保存转换后的 requestJson
- 保存 responseBody 与 statusCode
- 保存堆栈
Aop.get(MvLlmGenerateFailedDao.class).save(uniChatRequest, e, stackTrace);
这对排障尤其关键,因为 requestJson 是“最终发给平台的协议请求”,可以直接验证协议自动转换是否正确。
9. 生产建议与可选增强
以下是基于你现有实现的增强建议,属于可选项:
serviceKey 颗粒度 建议改为
platform + ":" + apiKey,避免跨平台互相冷却。请求体截断逻辑修正 你在早期代码里有
requestBody.subSequence(0, 1024)但未赋值的问题。当前版本truncate(...)已经更安全,建议统一使用它。429 之外的可恢复错误码分类 可以将 500/502/503/504 归为“上游暂时性错误”,并引入更短的首次退避,减少等待时间。
告警去重与降噪 当上游故障时告警可能爆炸,建议在告警层做:
- 相同错误码 + 相同 provider + 相同 model 的时间窗口聚合
- 或者告警限频
将冷却状态暴露为指标 例如输出当前 key 是否处于 cooldown、剩余毫秒数,便于运营观察限流情况。
9. 完整代码示例
package com.litongjava.api;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApiCooldownManager {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ConcurrentMap<String, Long> cooldownUntilTimestamps = new ConcurrentHashMap<>();
private final long COOLDOWN_BUFFER_MS = 500; // Add a small buffer to be safe
/**
* Checks if a global cooldown is active for the service. If so, sleeps the current thread.
* @param serviceKey Identifier for the AI service.
* @throws InterruptedException if the thread is interrupted while sleeping.
*/
public void enforceCooldown(String serviceKey) throws InterruptedException {
Long cooldownUntil = cooldownUntilTimestamps.get(serviceKey);
if (cooldownUntil != null) {
long now = System.currentTimeMillis();
if (now < cooldownUntil) {
long delay = cooldownUntil - now;
if (delay > 0) {
log.info("Global cooldown active for service '{}'. Delaying current request for {} ms.", serviceKey, delay);
Thread.sleep(delay);
}
}
}
}
/**
* Records or updates a global cooldown period for a service.
* @param serviceKey Identifier for the AI service.
* @param actualRetryDelayMillis The delay received from the API.
*/
public void recordCooldown(String serviceKey, long actualRetryDelayMillis) {
if (actualRetryDelayMillis <= 0) {
return;
}
long newCooldownUntil = System.currentTimeMillis() + actualRetryDelayMillis + COOLDOWN_BUFFER_MS;
// Update the cooldown time if the new one is further in the future
cooldownUntilTimestamps.merge(serviceKey, newCooldownUntil, Math::max);
log.info("Global cooldown for service '{}' updated. Next requests will be allowed after timestamp: {} (approx. {} ms from now).", serviceKey, newCooldownUntil,
(newCooldownUntil - System.currentTimeMillis()));
}
}
package com.vt.mc.qa.proxy;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.litongjava.api.ApiCooldownManager;
import com.litongjava.chat.UniChatClient;
import com.litongjava.chat.UniChatRequest;
import com.litongjava.chat.UniChatResponse;
import com.litongjava.consts.ModelPlatformName;
import com.litongjava.exception.GenerateException;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.manim.dao.MvLlmGenerateFailedDao;
import com.litongjava.manim.dao.MvLlmUsageDao;
import com.litongjava.tio.utils.SystemTimer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.json.FastJson2Utils;
import com.vt.mc.qa.services.AppConfigUtils;
import com.vt.mc.qa.utils.LarkBotUtils;
public class UniPredictService {
private static final Logger log = LoggerFactory.getLogger(UniPredictService.class);
// 与 PredictService 对齐:可配置化更好,这里先写死,便于迁移
private static final int MAX_ATTEMPTS = 10; // 你原来是 10 次,这里保持一致
private static final long DEFAULT_RETRY_DELAY_SECONDS = 30;
private static final long DEFAULT_RETRY_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_DELAY_SECONDS);
private static final ApiCooldownManager apiCooldownManager = new ApiCooldownManager();
public UniChatResponse generate(UniChatRequest uniChatRequest) {
// 0) 环境策略前置
applyChinaProxyIfNeeded(uniChatRequest);
Exception lastException = null;
// 1) 选择全局冷却 key:沿用 PredictService 的思路(apiKey 维度)
// 若 apiKey 为空,可降级为 platform 维度,避免 NPE
String serviceKey = uniChatRequest.getApiKey();
String platform = uniChatRequest.getPlatform();
if (serviceKey == null || serviceKey.isEmpty()) {
serviceKey = platform;
}
Long taskId = uniChatRequest.getTaskId();
for (int attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
String model = uniChatRequest.getModel();
try {
// 2) 全局冷却:避免并发场景下的 429 风暴
apiCooldownManager.enforceCooldown(serviceKey);
long start = SystemTimer.currTime;
UniChatResponse uniChatResponse = UniChatClient.generate(uniChatRequest);
long end = SystemTimer.currTime;
long elapsed = end - start;
if (uniChatResponse == null) {
log.warn("uniChatResponse is null, taskId={}, attempt={}/{}", taskId, attempt, MAX_ATTEMPTS);
// null 当作可重试异常处理
if (attempt < MAX_ATTEMPTS) {
sleepSafely(DEFAULT_RETRY_DELAY_MILLIS, uniChatRequest, attempt, serviceKey, "null response");
continue;
}
return null;
}
// 3) 成功落库:保留你的落库逻辑
Aop.get(MvLlmUsageDao.class).saveUsage(uniChatRequest, uniChatResponse, elapsed);
log.info("LLM generate success. taskId={}, platform={}, model={}, attempt={}, elapsedMs={}", taskId, platform,
model, attempt, elapsed);
return uniChatResponse;
} catch (GenerateException e) {
lastException = e;
// 4) 失败信息采集
String urlPrefix = e.getUrlPerfix();
String requestJson = e.getRequestBody();
Integer statusCode = e.getStatusCode();
String responseBody = e.getResponseBody();
String stackTrace = toStackTrace(e);
// 5) 失败落库
try {
Aop.get(MvLlmGenerateFailedDao.class).save(uniChatRequest, e, stackTrace);
} catch (Exception saveEx) {
log.error("Failed to save generate failed record. taskId={}", taskId, saveEx);
}
// 6) 分级告警
sendAlertByStatusCode(uniChatRequest, e, urlPrefix, requestJson, statusCode, responseBody, stackTrace);
// 7) 判定是否继续重试
if (!shouldRetry(uniChatRequest, e, statusCode, responseBody)) {
break;
}
// 8) 计算本次退避时间
long retryDelayMillis = computeRetryDelayMillis(uniChatRequest, e, responseBody);
// 9) 如果是 Gemini 429:更新全局冷却
if (ModelPlatformName.GOOGLE.equals(platform) && statusCode != null && statusCode == 429) {
apiCooldownManager.recordCooldown(serviceKey, retryDelayMillis);
}
// 10) 本请求线程也睡眠退避
if (attempt < MAX_ATTEMPTS) {
sleepSafely(retryDelayMillis, uniChatRequest, attempt, serviceKey, "GenerateException " + statusCode);
}
} catch (InterruptedException ie) {
// 来自 enforceCooldown 或 sleepSafely
Thread.currentThread().interrupt();
lastException = ie;
throw new RuntimeException("Interrupted during cooldown/backoff. taskId=" + taskId, ie);
} catch (Exception e) {
lastException = e;
// 其他异常(网络、序列化等),可告警 + 默认退避重试
String name = "tio-boot";
String warningName = "UniPredictService GenericException";
LarkBotUtils.sendException(name, warningName, null, null, null, e.getMessage(), e);
log.error("Generic exception. taskId={}, platform={}, model={}, attempt={}/{}", taskId, platform, model,
attempt, MAX_ATTEMPTS, e);
if (attempt < MAX_ATTEMPTS) {
sleepSafely(DEFAULT_RETRY_DELAY_MILLIS, uniChatRequest, attempt, serviceKey, "generic exception");
}
}
}
// 11) 所有尝试失败:抛出统一异常
if (lastException instanceof RuntimeException) {
throw (RuntimeException) lastException;
}
if (lastException != null) {
throw new RuntimeException("Failed to generate after retries. taskId=" + taskId, lastException);
}
return null;
}
private void applyChinaProxyIfNeeded(UniChatRequest uniChatRequest) {
String platform = uniChatRequest.getPlatform();
boolean china = AppConfigUtils.isChina();
if (china && ModelPlatformName.OPENROUTER.equals(platform)) {
String basePrefixUrl = EnvUtils.getStr("OPENROUTER_PROXY_BASE_URL");
if (basePrefixUrl != null && !basePrefixUrl.isEmpty()) {
uniChatRequest.setApiPrefixUrl(basePrefixUrl);
}
}
}
private boolean shouldRetry(UniChatRequest req, GenerateException e, Integer code, String responseBody) {
if (code == null) {
return true;
}
// Gemini 403 直接抛出(一般是权限/配额策略类错误)
if (ModelPlatformName.GOOGLE.equals(req.getPlatform()) && code == 403) {
throw e;
}
// 400: 特定余额不足错误直接停止重试
if (code == 400 && responseBody != null && !responseBody.isEmpty()) {
try {
JSONObject errorJsonObject = FastJson2Utils.parseObject(responseBody);
if (errorJsonObject.containsKey("error")) {
JSONObject errorObject = errorJsonObject.getJSONObject("error");
if (errorObject != null && errorObject.containsKey("message")) {
String message = errorObject.getString("message");
if (message != null && message.startsWith("Your credit balance is too low to access the")) {
return false;
}
}
}
} catch (Exception parseEx) {
// 解析失败不影响重试决策
}
}
// 其他状态码默认可重试(次数受 MAX_ATTEMPTS 限制)
return true;
}
private long computeRetryDelayMillis(UniChatRequest req, GenerateException e, String responseBody) {
Integer code = e.getStatusCode();
// Gemini 429:优先解析 RetryInfo.retryDelay
if (ModelPlatformName.GOOGLE.equals(req.getPlatform()) && code != null && code == 429) {
return extractRetryDelayMillisFromGeminiError(responseBody, DEFAULT_RETRY_DELAY_MILLIS);
}
// 其他情况:默认退避
return DEFAULT_RETRY_DELAY_MILLIS;
}
private void sleepSafely(long millis, UniChatRequest req, int attempt, String serviceKey, String reason) {
if (millis <= 0) {
return;
}
try {
log.info("Backoff sleep. taskId={}, serviceKey={}, attempt={}/{}, sleepMs={}, reason={}", req.getTaskId(),
serviceKey, attempt, MAX_ATTEMPTS, millis, reason);
Thread.sleep(millis);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during backoff sleep. taskId=" + req.getTaskId(), ie);
}
}
private void sendAlertByStatusCode(UniChatRequest req, GenerateException e, String urlPrefix, String requestJson,
Integer statusCode, String responseBody, String stackTrace) {
String name = "tio-boot";
String warningName = "UniPredictService LLM GenerateException: " + req.getTaskName();
// 400 余额不足、502 特殊处理、其他带 requestJson
if (statusCode != null && statusCode == 400) {
// 尝试识别余额不足,余额不足直接发告警但不带大请求体
boolean lowCredit = false;
try {
if (responseBody != null) {
JSONObject errorJsonObject = FastJson2Utils.parseObject(responseBody);
JSONObject errorObject = errorJsonObject.getJSONObject("error");
if (errorObject != null) {
String message = errorObject.getString("message");
if (message != null && message.startsWith("Your credit balance is too low to access the")) {
lowCredit = true;
}
}
}
} catch (Exception ignore) {
}
if (lowCredit) {
LarkBotUtils.sendException(name, warningName, urlPrefix, null, statusCode, responseBody, stackTrace);
return;
}
// 其他 400:带 requestJson(可选截断)
LarkBotUtils.sendException(name, warningName, urlPrefix, truncate(requestJson, 4096), statusCode, responseBody,
stackTrace);
return;
}
if (statusCode != null && statusCode == 502) {
LarkBotUtils.sendException(name, warningName, urlPrefix, null, statusCode, responseBody, stackTrace);
return;
}
// 默认:带 requestJson
LarkBotUtils.sendException(name, warningName, urlPrefix, truncate(requestJson, 4096), statusCode, responseBody,
stackTrace);
}
private static String truncate(String s, int maxLen) {
if (s == null) {
return null;
}
if (s.length() <= maxLen) {
return s;
}
return s.substring(0, maxLen);
}
private static String toStackTrace(Throwable t) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
t.printStackTrace(pw);
return sw.toString();
}
private static long parseRetryDelayToMillis(String retryDelayStr) {
if (retryDelayStr == null || retryDelayStr.isEmpty()) {
return -1;
}
try {
// Gemini 的 retryDelay 通常是类似 "3s"
String numericPart = retryDelayStr.replaceAll("[^0-9]", "");
if (numericPart.isEmpty()) {
return -1;
}
long seconds = Long.parseLong(numericPart);
return TimeUnit.SECONDS.toMillis(seconds);
} catch (NumberFormatException e) {
log.error("Failed to parse retryDelay: {}", retryDelayStr, e);
return -1;
}
}
private static long extractRetryDelayMillisFromGeminiError(String jsonErrorBody, long defaultDelayMillis) {
if (jsonErrorBody == null || jsonErrorBody.isEmpty()) {
return defaultDelayMillis;
}
try {
JSONObject errorResponse = FastJson2Utils.parseObject(jsonErrorBody);
JSONObject errorObj = errorResponse.getJSONObject("error");
if (errorObj != null) {
JSONArray details = errorObj.getJSONArray("details");
if (details != null) {
for (int j = 0; j < details.size(); j++) {
JSONObject detail = details.getJSONObject(j);
String type = detail.getString("@type");
if ("type.googleapis.com/google.rpc.RetryInfo".equals(type)) {
String retryDelayStr = detail.getString("retryDelay");
long parsedDelay = parseRetryDelayToMillis(retryDelayStr);
return parsedDelay > 0 ? parsedDelay : defaultDelayMillis;
}
}
}
}
} catch (Exception ex) {
log.error("Error parsing Gemini error response for retryDelay. body={}", jsonErrorBody, ex);
}
return defaultDelayMillis;
}
}
10. 小结
本节围绕两段核心代码构建了一套生产可用的限流与错误处理方案:
ApiCooldownManager提供跨线程的全局冷却,解决 429 并发风暴UniPredictService负责请求前置策略、重试、分级处理、智能退避与可中断性- 成功与失败均落库,结合告警形成完整的可观测性闭环
- 对 Gemini 429 支持解析 retryDelay,能显著降低盲目重试带来的成本与失败率
这套模式的价值在于:业务代码保持极简,而系统稳定性能力集中在统一入口层持续演进。
