流式生成
在实际业务中,很多场景并不适合“等待完整结果一次性返回”,例如:
- 聊天机器人逐字输出,提升交互体验
- 长文本生成(写作、总结、代码生成),降低首 token 延迟
- Web 前端使用 SSE / WebSocket 实时渲染内容
- 避免长时间阻塞请求导致的超时问题
java-openai 在统一抽象层之上,提供了跨平台一致的流式生成能力。业务侧只需要:
- 构造
UniChatRequest - 注册
UniChatEventListener - 调用
UniChatClient.stream(...)
即可获得统一的流式回调,而无需关心 OpenAI / Google / Anthropic / OpenRouter 等平台在流式协议上的差异。
基本思路
流式生成的核心思想是:
- 客户端发起请求(通常是 HTTP + SSE)
UniChatClient以流式模式请求大模型平台- 底层平台不断返回 增量内容(delta)
java-openai将不同平台的增量数据统一映射为UniChatResponse.delta- 业务侧在
onData回调中消费增量内容并推送给客户端
关键点:
在流式生成场景中,获取大模型输出应使用
chatResponse.getDelta(),而不是chatResponse.getMessage()。
示例:基于 SSE 的流式接口
下面示例展示了一个完整的 SSE 流式接口实现,基于 Tio HTTP Server。
示例代码
package com.litongjava.tio.web.hello.handler;
import java.util.ArrayList;
import java.util.List;
import com.litongjava.chat.PlatformInput;
import com.litongjava.chat.UniChatClient;
import com.litongjava.chat.UniChatEventListener;
import com.litongjava.chat.UniChatMessage;
import com.litongjava.chat.UniChatRequest;
import com.litongjava.chat.UniChatResponse;
import com.litongjava.consts.ModelPlatformName;
import com.litongjava.openrouter.OpenRouterModels;
import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.sse.SsePacket;
import com.litongjava.tio.http.server.handler.HttpRequestHandler;
import com.litongjava.tio.http.server.util.SseEmitter;
import okhttp3.Response;
import okhttp3.sse.EventSource;
public class PredictStreamHandler implements HttpRequestHandler {
@Override
public HttpResponse handle(HttpRequest httpRequest) throws Exception {
// 1. 指定平台与模型
PlatformInput platform = new PlatformInput(
ModelPlatformName.OPENROUTER,
OpenRouterModels.XIAOMI_MIMO_V2_FLASH_FREE
);
// 2. 构造消息
List<UniChatMessage> messges = new ArrayList<>();
messges.add(UniChatMessage.buildUser("just say hi"));
// 3. 构造统一请求
UniChatRequest uniChatRequest = new UniChatRequest(platform);
uniChatRequest.setMessages(messges);
// 4. 准备 SSE 响应
HttpResponse httpResponse = TioRequestContext.getResponse();
// 指定tio-boot dispathher handler 收到改请求后不发送给客户端,因为后面需要再onOpen时手动发送
httpResponse.setSend(false);
ChannelContext ctx = httpRequest.channelContext;
// 5. 注册流式事件监听器
UniChatEventListener listener = new UniChatEventListener() {
@Override
public void onOpen(EventSource eventSource, Response response) {
// 建立 SSE 连接
httpResponse.addServerSentEventsHeader();
Tio.send(ctx, httpResponse);
}
@Override
public void onData(UniChatResponse chatResposne) {
// 流式增量内容从 delta 中获取
String content = chatResposne.getDelta().getContent();
Tio.bSend(ctx, new SsePacket(content));
}
@Override
public void onClosed(EventSource eventSource) {
// 关闭 SSE 连接
SseEmitter.closeSeeConnection(ctx);
}
};
// 6. 发起流式请求
UniChatClient.stream(uniChatRequest, listener);
return httpResponse;
}
}
客户端调用示例
使用 curl 直接访问流式接口:
curl http://localhost/predict/stream
示例输出:
data:
data:Hi
data: there
data:!
data:😊
data: How can
data: I help
data: you
data: today?
data:
data:
可以看到,大模型的回复被拆分成多个片段,逐条通过 SSE 推送给客户端。
核心要点说明
1. 为什么使用 getDelta() 而不是 getMessage()
在 java-openai 的统一模型中:
非流式请求
- 返回一次完整结果
- 使用
UniChatResponse.message
流式请求
- 每次回调只包含“新增的一小段内容”
- 使用
UniChatResponse.delta
因此:
chatResponse.getMessage()- 适用于
UniChatClient.generate(...)
- 适用于
chatResponse.getDelta()- 适用于
UniChatClient.stream(...)
- 适用于
这是一个非常重要的区分点。
2. UniChatEventListener 生命周期
UniChatEventListener 通常包含三个关键回调:
onOpen(...)流式连接建立成功。 在这里初始化 SSE 头、发送首次响应。onData(UniChatResponse response)每收到一次模型的增量输出,就会触发一次。 业务应在这里消费response.getDelta()并向前端推送。onClosed(...)流式生成结束或连接关闭。 用于释放资源、关闭 SSE 连接。
3. 平台差异的屏蔽
不同平台在流式返回上的差异很大,例如:
- OpenAI:delta.content
- Anthropic:content block
- Google:parts / text fragments
- OpenRouter:聚合多种平台返回格式
这些差异全部由 UniChatClient 内部处理并统一映射为:
UniChatResponse.delta
业务侧只需要处理一种结构。
适用场景建议
推荐在以下场景中使用流式生成:
- 聊天机器人 / AI 助手
- 实时写作、润色、翻译
- 代码补全与生成
- 长文本摘要或报告生成
- Web UI 强交互场景
而在以下场景中,非流式更合适:
- 需要一次性完整 JSON 结构输出
- 后台任务或批处理
- 对响应顺序和完整性要求极高的系统接口
小结
java-openai提供了统一的跨平台流式生成能力- 流式场景下,必须使用
UniChatResponse.getDelta()获取模型输出 UniChatClient.stream(...)+UniChatEventListener是标准使用方式- SSE / WebSocket / 其他推送机制都可以在监听器中自由组合
