主动介入
1. 目标
在实时语音场景中,用户有时会在官提问后长时间沉默。为了让对话自然推进,系统需要在用户长时间未开始回答时,主动由模型介入,引导用户继续作答、补充思路,或换一种更容易回答的方式继续交流。
本方案的目标是:
- 在用户长时间未开始回答时自动触发主动介入。
- 主动介入由模型自然生成,而不是后端写死固定提醒文案。
- 避免把“麦克风仍在持续上传静音音频”误判为“用户正在回答”。
- 提供开关,决定是否启用主动介入能力。
- 保持实现边界清晰,尽量少侵入现有实时语音桥接结构。
2. 核心思路
主动介入的判断不应基于“前端是否还在持续上传音频包”,因为在麦克风开启时,即便用户没有说话,也可能持续产生静音帧、环境噪声数据或浏览器维持的音频流。
因此,更合理的判断方式是:
- 以模型上一轮回复结束作为计时起点。
- 在此之后,只有当检测到用户真实开始回答时,才停止计时。
- 如果在超时时间内始终没有检测到用户真实开始回答,则由后端向模型发送一段“当前用户已沉默若干秒”的文本说明,让模型自行生成自然的追问或引导。
换句话说,主动介入的本质不是“系统替模型说一句固定提醒”,而是“系统把沉默事实告诉模型,由模型决定如何自然接话”。
3. 实现位置
主动介入建议放在回调层实现,也就是负责:
- 接收模型返回事件
- 向前端发送文本和音频
- 感知一轮对话状态变化
这一层最适合的实现载体是回调对象本身,因为它天然能看到:
- 模型输出事件
- 模型转写事件
- 对话轮次完成事件
- 打断事件
- 会话关闭事件
这样做的好处是:
- 不需要把大量业务状态塞进 handler。
- 不需要让 bridge 本身承担过多策略。
- 可以直接基于模型回调事件维护“是否正在等待用户回答”的状态。
4. 关键状态定义
主动介入依赖几个核心状态。
4.1 是否开启主动介入
这是一个布尔开关,用于决定当前会话是否启用该能力。
作用:
- 便于灰度发布
- 便于在不同模式下选择是否启用
- 便于定位问题时临时关闭
4.2 是否处于“等待用户回答”状态
该状态表示:
- 模型已经完成上一轮问题或回应
- 当前轮到用户开始回答
只有在这个状态下,系统才会开始计时,并检测是否需要主动介入。
如果不在这个状态,即使时间流逝很久,也不应该触发主动介入。
4.3 模型上一轮完成时间
该时间点用于作为沉默计时起点。
一旦模型结束当前一轮输出,就记录这一时间。
后续判断主动介入时,不再比较“最近一次音频上传时间”,而是比较“从模型完成回复到现在过了多久”。
4.4 用户最近一次真实开口时间
该时间并不是必须作为主动介入的主判断条件,但可以作为状态辅助信息存在。
它表示最近一次确认用户已经开始真实回答的时间,用于:
- 记录用户真实参与的时机
- 辅助日志排查
- 未来扩展更复杂的策略
4.5 最近一次主动介入时间
为了避免系统在短时间内连续多次催促用户,需要记录最近一次主动介入的时间,并设置最小重复间隔。
这样可以避免:
- 每秒都触发一次介入
- 模型连续不断地重复催促
- 用户体验变差
4.6 最近一轮官内容与用户内容
主动介入的提示语不是固定模板,而是要根据上下文让模型自行生成自然表达。
因此需要记录:
- 最近一轮官内容
- 最近一轮用户内容
这样在触发主动介入时,可以把上下文一并告诉模型,让模型决定是提醒、追问、给提示,还是建议先给简短结论。
5. 什么事件会进入“等待用户回答”状态
进入等待用户回答状态的标准是:
模型一轮输出结束。
可用于判断这一点的事件包括:
- assistant 一轮完成事件
- 通用 turn 完成事件
- bridge 显式上报的 assistant turnComplete 事件
这些事件的语义都是一致的:当前模型已经说完这一轮内容,接下来应该轮到用户回答。
一旦收到这类事件,就需要:
- 将状态切换为“等待用户回答”。
- 记录“模型上一轮完成时间”。
- 从此时开始计算沉默时长。
6. 什么事件表示“用户真的开始回答了”
这是整个方案中最重要的设计点。
不能把“音频包还在上传”当作用户已经回答,因为那可能只是静音。
真正能说明用户开始回答的信号应当是:
6.1 语音开始事件
如果模型或桥接层能够提供“检测到用户开始说话”的事件,那么这是最灵敏的信号。
它的优点是:
- 能在用户刚开口时就结束等待
- 响应快
- 不必等完整转写文本出来
6.2 用户输入转写事件
如果系统支持用户侧实时转写,那么一旦收到用户转写内容,就说明用户确实已经开始表达了。
这是非常可靠的信号。
它的优点是:
- 误判少
- 能直接记录最近用户内容
缺点是:
- 相比“开始说话”事件略慢一点,因为需要识别出文字
6.3 用户主动发送文本输入
如果系统支持文本输入,那么用户主动发来的文本也应被视为用户已经开始回答。
6.4 不应作为真实开口依据的事件
以下事件不应被当作“用户已经开始回答”的依据:
- 持续收到音频流
- 音频流结束事件
- 麦克风仍然处于开启状态
这些都不能说明用户真的开口了。
7. 主动介入的触发条件
主动介入必须同时满足以下条件:
条件一:功能已开启
如果会话没有开启主动介入,则永不触发。
条件二:当前处于等待用户回答状态
只有当模型已完成上一轮回复,且轮到用户回答时,才允许主动介入。
条件三:从模型上一轮完成到现在已超过超时时间
例如:
- 8 秒未开始回答时触发第一次主动介入
这里的超时时间应当可配置。
条件四:距离上一次主动介入已超过最小间隔
例如:
- 上一次主动介入刚刚发生,则本次先不触发
- 只有超过一定时间后,才允许再次主动介入
8. 主动介入的内容生成方式
主动介入不建议由后端直接写死一句话,例如:
“请继续回答。”
这种方式有几个问题:
- 生硬
- 无法结合上下文
- 不能根据用户已经说过的内容自然追问
- 不同题型下效果很差
更好的方式是:
由后端组织一段说明文本,把“当前发生了什么”告诉模型,包括:
- 当前是实时语音场景
- 用户自官上一轮结束后已沉默若干秒
- 最近一轮官说了什么
- 最近用户说了什么
- 希望模型自然推进对话,不要机械重复
然后由模型直接生成下一句对用户说的话。
这样主动介入可以根据上下文自然演化成不同策略,例如:
- 温和提醒继续回答
- 给一点启发
- 从用户刚才提到的内容继续追问
- 提议先给一个简短结论
- 视情况换一种更容易的追问方式
9. 为什么不建议直接基于音频流判断沉默
在实时语音场景中,浏览器通常会持续发送音频帧。即便用户没说话,也可能仍然持续上行:
- 静音 PCM
- 背景噪声
- 房间环境音
- 降噪后的微小波动
如果把“只要还在收音频,就认为用户活跃”,会导致两个问题:
问题一:主动介入永远触发不了
因为系统误以为用户一直在“回答”。
问题二:状态语义混乱
“麦克风有数据”与“用户开始回答”是两件事,不应混为一谈。
因此,沉默检测必须基于更有语义的事件,而不是底层流量本身。
10. 会话生命周期中的行为
10.1 会话开始时
在会话初始化时,应完成:
- 主动介入开关配置
- 超时时间配置
- 重复触发最小间隔配置
- 启动主动介入检测任务
10.2 模型输出过程中
只要模型有持续输出,就更新模型最近活动时间与最近官内容。
但此时不能开始等待用户回答,因为模型还没说完。
10.3 模型一轮完成时
切换到等待用户回答状态,并开始沉默计时。
10.4 用户开始回答时
结束等待状态,停止本轮沉默检测。
10.5 会话关闭时
必须停止该会话的主动介入检查任务,并释放相关状态。
如果不停止,会产生以下问题:
- 已关闭会话仍在继续检查
- 已关闭会话仍可能触发主动介入
- 无法释放会话对象引用,造成资源泄漏
11. 定时检测任务的建议实现方式
主动介入需要周期性检查是否满足触发条件。
这里推荐使用:
- 共享调度器负责定时检查
- 每个会话注册自己的检查任务
- 会话关闭时取消对应任务
不建议为每个会话都单独创建一个独立线程池,因为这样在会话规模变大时会导致线程数膨胀。
更合理的思路是:
- 使用共享的定时调度能力
- 单个会话只保留一个可取消的定时任务句柄
- 关闭会话时只取消该会话任务,不影响其他会话
12. 主动介入与虚拟线程的关系
主动介入分成两个部分:
12.1 周期性检查
这是一个轻量、固定频率的调度任务,适合交给共享调度器处理。
这部分不需要为了每个会话专门创建虚拟线程。
12.2 真正执行“把提示发给模型”
这一动作本质上是一次单独的发送操作,如果该发送过程可能阻塞,那么可以考虑使用虚拟线程来承载这类单次任务。
因此,更合理的做法是:
- 调度层使用共享调度器
- 真正触发模型介入时,如有需要,可交给虚拟线程执行
13. 配置项建议
建议至少提供以下配置项:
13.1 是否启用主动介入
用于总开关控制。
13.2 首次主动介入超时时间
例如 8 秒。
13.3 重复主动介入最小间隔
例如 8 秒或更长。
13.4 是否允许多次主动介入
有些场景只希望提醒一次,有些场景允许多次追问。
13.5 主动介入提示风格
虽然最终由模型决定如何说,但可以通过系统说明控制语气,例如:
- 更温和
- 更像官
- 更偏启发式
- 更偏推进式
14. 推荐日志埋点
为了便于联调,建议记录以下关键日志。
14.1 进入等待用户回答状态
确认模型一轮完成后是否真的开始了沉默计时。
14.2 检测到用户真实开口
确认哪些事件真正结束了等待状态。
14.3 触发主动介入
包括:
- 会话标识
- 沉默时长
- 最近官内容摘要
- 最近用户内容摘要
14.4 主动介入被跳过的原因
例如:
- 功能未开启
- 当前不在等待用户回答状态
- 尚未到超时时间
- 距离上次主动介入间隔不足
- 模型发送器未绑定
这些日志能极大提升排查效率。
15. 联调时的重点验证项
15.1 模型一轮结束后,系统是否进入等待状态
如果没有进入等待状态,后续再久也不会触发主动介入。
15.2 用户没有说话时,是否按预期超时触发
例如模型说完后 8 秒用户沉默,应看到主动介入被触发。
15.3 用户一旦开口,是否立即停止等待
无论通过语音开始事件还是用户转写,都应该结束沉默计时。
15.4 主动介入是否会过于频繁
若频率太高,需要调整重复间隔。
15.5 主动介入内容是否自然
如果内容太生硬,应调整发给模型的系统说明,而不是把后端文案写死。
16. 最终方案总结
本方案的关键不在于“是否还在收音频流”,而在于:
- 模型是否已经结束上一轮回复
- 用户是否已经被识别为真正开始回答
因此,主动介入的最合理判定方式是:
- 模型一轮结束后进入等待用户回答状态。
- 从模型上一轮完成时间开始计时。
- 若在设定时间内始终没有检测到用户真实开始回答,则触发主动介入。
- 主动介入不是固定提醒,而是向模型描述当前沉默事实与上下文,让模型自然接话。
- 会话关闭时必须停止检测任务并清理资源。
这样可以在保持实时性与自然感的同时,避免把静音音频误判成用户正在回答,也能让模型更像真人官一样推动对话继续。
代码实现
CallbackExecutorService
package com.litongjava.voice.agent.callback;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class CallbackExecutorService {
public static final ScheduledExecutorService SHARED_SCHEDULER =
Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r, "ws-realtime-bridge-callback-scheduler");
t.setDaemon(true);
return t;
});
}
CallbackPromptUtils
package com.litongjava.voice.agent.callback;
public class CallbackPromptUtils {
public static String buildProactiveInterventionPrompt(String lastAssistantText, String lastUserText, long idleMs) {
long idleSec = Math.max(1L, idleMs / 1000L);
String assistantContext = emptyToDefault(lastAssistantText, "无");
String userContext = emptyToDefault(lastUserText, "无");
return "" + "系统提示:当前是实时语音场景。\n"
//
+ "模型刚刚已经完成了一轮提问或回应,直到现在用户已经沉默了 " + idleSec + " 秒,仍未开始正式回答。\n"
//
+ "请你根据当前上下文主动介入,但要自然、简洁、像真人模型,不要机械重复。\n"
//
+ "你的目标是推动对话继续进行。\n" + "你可以视上下文选择:\n"
//
+ "1. 温和提醒用户继续回答;\n"
//
+ "2. 如果用户可能卡住了,给一个轻微引导;\n"
//
+ "3. 如果用户已回答过部分内容,可基于他的内容继续追问;\n"
//
+ "4. 如果问题较难,也可以建议先给简短结论再展开。\n"
//
+ "请直接输出你要对用户说的话,不要解释策略。\n"
//
+ "最近一轮模型内容:" + assistantContext + "\n"
//
+ "最近用户内容:" + userContext;
}
private static String emptyToDefault(String value, String dft) {
return value == null || value.trim().isEmpty() ? dft : value.trim();
}
}
WsRealtimeBridgeCallback
package com.litongjava.voice.agent.callback;
import java.nio.file.Path;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import com.litongjava.media.NativeMedia;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.utils.json.JsonUtils;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import com.litongjava.voice.agent.audio.AudioFinishCallback;
import com.litongjava.voice.agent.audio.SessionAudioRecorder;
import com.litongjava.voice.agent.bridge.RealtimeBridgeCallback;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.consts.VoiceAgentConst;
import com.litongjava.voice.agent.model.WsVoiceAgentResponseMessage;
import com.litongjava.voice.agent.utils.ChannelContextUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WsRealtimeBridgeCallback implements RealtimeBridgeCallback {
private volatile ScheduledFuture<?> proactiveFuture;
private final ChannelContext channelContext;
private final String sessionId;
/**
* 是否开启主动介入
*/
private volatile boolean proactiveInterventionEnabled = false;
/**
* assistant 完成回复后,用户沉默多久开始主动介入
*/
private volatile long proactiveInterventionTimeoutMs = 8_000L;
/**
* 两次主动介入之间的最小间隔
*/
private volatile long proactiveInterventionRepeatMs = 8_000L;
/**
* 当前是否处于“assistant 已说完,等待用户回答”的阶段
*/
private volatile boolean waitingForUserAnswer = false;
/**
* 最近一次 assistant 完成一轮回复的时间
*/
private volatile long lastAssistantTurnCompleteAt = 0L;
/**
* 最近一次真实检测到用户说话/输入文本的时间
*/
private volatile long lastRealUserSpeechAt = 0L;
/**
* 最近一次 assistant 活动时间
*/
private volatile long lastAssistantActivityAt = 0L;
/**
* 最近一次任意活动时间
*/
private volatile long lastActivityAt = System.currentTimeMillis();
/**
* 最近一次主动介入时间
*/
private volatile long lastProactiveInterventionAt = 0L;
/**
* 最近一次 assistant 文本
*/
private volatile String lastAssistantText = "";
/**
* 最近一次用户文本
*/
private volatile String lastUserText = "";
/**
* 是否已经关闭
*/
private volatile boolean closed = false;
/**
* 由 handler 注入,真正把文本发送给模型
*/
private volatile Consumer<String> modelTextSender;
private final AtomicBoolean proactiveTaskStarted = new AtomicBoolean(false);
public WsRealtimeBridgeCallback(ChannelContext channelContext) {
this.channelContext = channelContext;
this.sessionId = ChannelContextUtils.key(channelContext);
}
public void bindModelTextSender(Consumer<String> modelTextSender) {
this.modelTextSender = modelTextSender;
}
public void configureProactiveIntervention(boolean enabled, long timeoutMs, long repeatMs) {
this.proactiveInterventionEnabled = enabled;
if (timeoutMs > 0) {
this.proactiveInterventionTimeoutMs = timeoutMs;
}
if (repeatMs > 0) {
this.proactiveInterventionRepeatMs = repeatMs;
}
}
/**
* 仅表示有音频流在上传,不代表用户真的开口。
* 所以这里不改变 waitingForUserAnswer,不参与“沉默结束”判断。
*/
public void onUserAudioActivity() {
this.lastActivityAt = System.currentTimeMillis();
}
/**
* 用户明确发送文本输入,视为真实回答。
*/
public void onUserTextActivity(String text) {
this.lastUserText = safeText(text);
markRealUserSpeechActivity("user_text_input");
}
@Override
public void sendText(String json) {
inspectServerEvent(json);
WebSocketResponse wsResp = WebSocketResponse.fromText(json, VoiceAgentConst.CHARSET);
Tio.send(channelContext, wsResp);
}
@Override
public void sendBinary(byte[] bytes) {
try {
SessionAudioRecorder.appendModelPcm(sessionId, bytes);
} catch (Exception ex) {
log.warn("record model pcm failed: {}", ex.getMessage());
}
markAssistantActivity();
WebSocketResponse wsResp = WebSocketResponse.fromBytes(bytes);
Tio.send(channelContext, wsResp);
}
@Override
public void close(String reason) {
closed = true;
try {
proactiveFuture.cancel(true);
} catch (Exception e) {
log.warn("shutdown scheduler failed: {}", e.getMessage());
}
AudioFinishCallback audioFinishCallback = new AudioFinishCallback() {
@Override
public void done(Path audioFile) {
String wavFilePath = audioFile.toString();
NativeMedia.toMp3(wavFilePath);
}
};
SessionAudioRecorder.stop(sessionId, audioFinishCallback);
Tio.remove(channelContext, reason);
}
@Override
public void session(String sessionId) {
}
/**
* 如果 bridge 显式调用了 turnComplete,这里直接用。
*/
@Override
public void turnComplete(String role, String text) {
if (closed) {
return;
}
if ("assistant".equalsIgnoreCase(role) || "model".equalsIgnoreCase(role)) {
this.lastAssistantText = safeText(text);
enterWaitingForUserAnswer("turnComplete(role=assistant)");
} else if ("user".equalsIgnoreCase(role)) {
this.lastUserText = safeText(text);
markRealUserSpeechActivity("turnComplete(role=user)");
}
}
@Override
public void start(RealtimeSetup setup) {
startProactiveTaskIfNeeded();
}
private void startProactiveTaskIfNeeded() {
if (!proactiveTaskStarted.compareAndSet(false, true)) {
return;
}
proactiveFuture = CallbackExecutorService.SHARED_SCHEDULER.scheduleAtFixedRate(() -> {
try {
checkAndTriggerProactiveIntervention();
} catch (Throwable e) {
log.warn("checkAndTriggerProactiveIntervention error, sessionId:{}", sessionId, e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private void checkAndTriggerProactiveIntervention() {
if (closed) {
return;
}
if (!proactiveInterventionEnabled) {
return;
}
if (!waitingForUserAnswer) {
return;
}
Consumer<String> sender = this.modelTextSender;
if (sender == null) {
return;
}
if (lastAssistantTurnCompleteAt <= 0L) {
return;
}
long now = System.currentTimeMillis();
long idleMs = now - lastAssistantTurnCompleteAt;
if (idleMs < proactiveInterventionTimeoutMs) {
return;
}
long sinceLastIntervention = now - lastProactiveInterventionAt;
if (lastProactiveInterventionAt > 0L && sinceLastIntervention < proactiveInterventionRepeatMs) {
return;
}
String interventionPrompt = CallbackPromptUtils.buildProactiveInterventionPrompt(lastAssistantText, lastUserText,
idleMs);
log.info(
"trigger proactive intervention, sessionId:{}, idleMs:{}, waitingForUserAnswer:{}, lastAssistantTurnCompleteAt:{}",
sessionId, idleMs, waitingForUserAnswer, lastAssistantTurnCompleteAt);
lastProactiveInterventionAt = now;
try {
sender.accept(interventionPrompt);
markAssistantActivity();
} catch (Exception e) {
log.warn("modelTextSender.accept failed, sessionId:{}, prompt:{}", sessionId, interventionPrompt, e);
}
}
private void inspectServerEvent(String json) {
if (json == null || json.isEmpty()) {
return;
}
try {
WsVoiceAgentResponseMessage msg = JsonUtils.parse(json, WsVoiceAgentResponseMessage.class);
if (msg == null || msg.getType() == null) {
return;
}
String type = msg.getType();
if ("transcript_in".equalsIgnoreCase(type)) {
this.lastUserText = safeText(msg.getText());
markRealUserSpeechActivity("transcript_in");
return;
}
if ("speech_started".equalsIgnoreCase(type)) {
markRealUserSpeechActivity("speech_started");
return;
}
if ("transcript_out".equalsIgnoreCase(type) || "text".equalsIgnoreCase(type)) {
this.lastAssistantText = safeText(msg.getText());
markAssistantActivity();
return;
}
if ("assistant_turn_start".equalsIgnoreCase(type)) {
markAssistantActivity();
return;
}
if ("assistant_turn_complete".equalsIgnoreCase(type) || "turn_complete".equalsIgnoreCase(type)) {
enterWaitingForUserAnswer(type);
return;
}
if ("assistant_turn_interrupt".equalsIgnoreCase(type) || "interrupted".equalsIgnoreCase(type)) {
markRealUserSpeechActivity(type);
return;
}
if ("error".equalsIgnoreCase(type) || "go_away".equalsIgnoreCase(type)) {
markAssistantActivity();
}
} catch (Exception e) {
log.debug("inspectServerEvent parse failed, sessionId:{}, json:{}", sessionId, json);
}
}
private void enterWaitingForUserAnswer(String reason) {
long now = System.currentTimeMillis();
this.waitingForUserAnswer = true;
this.lastAssistantTurnCompleteAt = now;
this.lastActivityAt = now;
log.info("enter waitingForUserAnswer, sessionId:{}, reason:{}, proactiveEnabled:{}, lastAssistantText:{}",
sessionId, reason, proactiveInterventionEnabled, shortText(lastAssistantText));
}
private void markRealUserSpeechActivity(String reason) {
long now = System.currentTimeMillis();
this.lastRealUserSpeechAt = now;
this.lastActivityAt = now;
this.waitingForUserAnswer = false;
log.info("mark real user speech activity, sessionId:{}, reason:{}, lastUserText:{}", sessionId, reason,
shortText(lastUserText));
}
private void markAssistantActivity() {
long now = System.currentTimeMillis();
this.lastAssistantActivityAt = now;
this.lastActivityAt = now;
}
private String safeText(String text) {
return text == null ? "" : text.trim();
}
private String shortText(String text) {
if (text == null) {
return "";
}
String s = text.trim();
return s.length() <= 120 ? s : s.substring(0, 120) + "...";
}
}
VoiceSocketHandler
package com.litongjava.voice.agent.handler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.litongjava.tio.consts.TioConst;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.utils.json.JsonUtils;
import com.litongjava.tio.websocket.common.WebSocketRequest;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import com.litongjava.tio.websocket.common.WebSocketSessionContext;
import com.litongjava.tio.websocket.server.handler.IWebSocketHandler;
import com.litongjava.voice.agent.audio.SessionAudioRecorder;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeModelBridgeFactory;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.WsRealtimeBridgeCallback;
import com.litongjava.voice.agent.model.WsVoiceAgentRequestMessage;
import com.litongjava.voice.agent.model.WsVoiceAgentResponseMessage;
import com.litongjava.voice.agent.model.WsVoiceAgentType;
import com.litongjava.voice.agent.utils.ChannelContextUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VoiceSocketHandler implements IWebSocketHandler {
/**
* 一个前端连接一个 bridge
*/
private static final Map<String, RealtimeModelBridge> BRIDGES = new ConcurrentHashMap<>();
/**
* 一个前端连接一个 callback
*/
private static final Map<String, WsRealtimeBridgeCallback> CALLBACKS = new ConcurrentHashMap<>();
/**
* 主动介入总开关
*/
private static final boolean ENABLE_PROACTIVE_INTERVENTION = true;
/**
* assistant 完成回复后,用户沉默多久开始主动介入
*/
private static final long PROACTIVE_INTERVENTION_TIMEOUT_MS = 8_000L;
/**
* 两次主动介入之间的最小间隔
*/
private static final long PROACTIVE_INTERVENTION_REPEAT_MS = 8_000L;
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse response, ChannelContext channelContext)
throws Exception {
log.info("请求信息: {}", httpRequest);
return response;
}
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext)
throws Exception {
log.info("握手完成: {}", httpRequest);
}
@Override
public Object onClose(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
String sessionKey = ChannelContextUtils.key(channelContext);
cleanupSession(channelContext, sessionKey, "客户端主动关闭连接");
return null;
}
@Override
public Object onBytes(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
String sessionKey = ChannelContextUtils.key(channelContext);
// 这里只表示“麦克风流有数据”,不代表用户真的开口,所以只做轻量触达
WsRealtimeBridgeCallback callback = CALLBACKS.get(sessionKey);
if (callback != null) {
callback.onUserAudioActivity();
}
try {
SessionAudioRecorder.appendUserPcm(sessionKey, bytes);
} catch (Exception ex) {
log.warn("appendUserPcm failed: {}", ex.getMessage());
}
RealtimeModelBridge bridge = BRIDGES.get(sessionKey);
if (bridge != null) {
try {
bridge.sendPcm16k(bytes);
} catch (Exception e) {
log.error("bridge.sendPcm16k error, sessionKey:{}", sessionKey, e);
}
} else {
log.warn("bridge not found when onBytes, sessionKey:{}", sessionKey);
}
return null;
}
@Override
public Object onText(WebSocketRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
WebSocketSessionContext wsSessionContext = (WebSocketSessionContext) channelContext.get();
String path = wsSessionContext.getHandshakeRequest().getRequestLine().path;
log.info("路径:{},收到消息:{}", path, text);
String rawText = text == null ? "" : text.trim();
WsVoiceAgentRequestMessage msg = null;
try {
msg = JsonUtils.parse(rawText, WsVoiceAgentRequestMessage.class);
} catch (Exception je) {
log.debug("收到非 JSON 文本或无法解析为 WsMessage: {}", je.getMessage());
return null;
} catch (Throwable e) {
log.error("解析收到的消息异常", e);
return null;
}
String sessionKey = ChannelContextUtils.key(channelContext);
RealtimeModelBridge bridge = BRIDGES.get(sessionKey);
if (bridge == null && msg != null && msg.getType() != null) {
WsVoiceAgentType typeEnum = parseType(msg.getType());
if (typeEnum == WsVoiceAgentType.SETUP) {
String platform = msg.getPlatform();
String systemPrompt = msg.getSystem_prompt();
String userPrompt = msg.getUser_prompt();
String jobDescription = msg.getJob_description();
String resume = msg.getResume();
String questions = msg.getQuestions();
String greeting = msg.getGreeting();
RealtimeSetup realtimeSetup = new RealtimeSetup(systemPrompt, userPrompt, jobDescription, resume, questions,
greeting);
connectLLM(channelContext, platform, realtimeSetup);
WsVoiceAgentResponseMessage resp = new WsVoiceAgentResponseMessage(WsVoiceAgentType.SETUP_RECEIVED.name());
resp.setSessionId(sessionKey);
String json = toJson(resp);
Tio.send(channelContext, WebSocketResponse.fromText(json, TioConst.UTF_8));
} else {
log.warn("bridge not ready and first message is not SETUP, sessionKey:{}, type:{}", sessionKey, msg.getType());
}
return null;
}
if (bridge == null) {
String respJson = toJson(new WsVoiceAgentResponseMessage(WsVoiceAgentType.ERROR.name(), "no bridge"));
Tio.send(channelContext, WebSocketResponse.fromText(respJson, TioConst.UTF_8));
return null;
}
try {
if (msg != null && msg.getType() != null) {
WsVoiceAgentType typeEnum = parseType(msg.getType());
if (typeEnum != null) {
switch (typeEnum) {
case AUDIO_END: {
bridge.endAudioInput();
break;
}
case TEXT: {
String userText = msg.getText() == null ? "" : msg.getText();
WsRealtimeBridgeCallback callback = CALLBACKS.get(sessionKey);
if (callback != null) {
callback.onUserTextActivity(userText);
}
bridge.sendText(userText);
break;
}
case CLOSE: {
cleanupSession(channelContext, sessionKey, "client requested close");
break;
}
default: {
Tio.send(channelContext,
WebSocketResponse.fromText(
toJson(new WsVoiceAgentResponseMessage(WsVoiceAgentType.IGNORED.name(), rawText)),
TioConst.UTF_8));
break;
}
}
} else {
log.debug("未知的 type: {}", msg.getType());
}
}
} catch (Exception e) {
log.error("onText handle error, sessionKey:{}", sessionKey, e);
}
return null;
}
private void connectLLM(ChannelContext channelContext, String platform, RealtimeSetup setup) {
String sessionKey = ChannelContextUtils.key(channelContext);
WsRealtimeBridgeCallback callback = new WsRealtimeBridgeCallback(channelContext);
callback.configureProactiveIntervention(ENABLE_PROACTIVE_INTERVENTION, PROACTIVE_INTERVENTION_TIMEOUT_MS,
PROACTIVE_INTERVENTION_REPEAT_MS);
try {
SessionAudioRecorder.start(sessionKey, 16000, 24000);
} catch (Exception e) {
log.warn("start recorder failed: {}", e.getMessage());
}
RealtimeModelBridge bridge = RealtimeModelBridgeFactory.createBridge(platform, callback);
callback.bindModelTextSender(prompt -> {
try {
RealtimeModelBridge b = BRIDGES.get(sessionKey);
if (b != null) {
b.sendText(prompt);
} else {
log.warn("bridge not found when proactive intervention, sessionKey:{}", sessionKey);
}
} catch (Exception e) {
log.warn("bridge.sendText failed, sessionKey:{}, prompt:{}", sessionKey, prompt, e);
}
});
callback.start(setup);
CALLBACKS.put(sessionKey, callback);
BRIDGES.put(sessionKey, bridge);
try {
bridge.connect(setup);
} catch (Exception e) {
log.error("bridge.connect error, sessionKey:{}", sessionKey, e);
cleanupSession(channelContext, sessionKey, "bridge connect failed");
}
}
private void cleanupSession(ChannelContext channelContext, String sessionKey, String reason) {
WsRealtimeBridgeCallback callback = CALLBACKS.remove(sessionKey);
RealtimeModelBridge bridge = BRIDGES.remove(sessionKey);
if (bridge != null) {
try {
bridge.close();
} catch (Exception e) {
log.warn("bridge.close error, sessionKey:{}", sessionKey, e);
}
return;
}
if (callback != null) {
try {
callback.close(reason);
} catch (Exception e) {
log.warn("callback.close error, sessionKey:{}", sessionKey, e);
}
return;
}
try {
Tio.remove(channelContext, reason);
} catch (Exception e) {
log.warn("Tio.remove error, sessionKey:{}", sessionKey, e);
}
}
private WsVoiceAgentType parseType(String type) {
if (type == null) {
return null;
}
try {
return WsVoiceAgentType.valueOf(type.trim().toUpperCase());
} catch (Exception e) {
return null;
}
}
private String toJson(WsVoiceAgentResponseMessage wsVoiceAgentResponseMessage) {
return JsonUtils.toSkipNullJson(wsVoiceAgentResponseMessage);
}
}
