Server-Sent Events (SSE)

SSE 简介

Server-Sent Events(SSE)是一种允许服务器主动向客户端发送信息的技术。与 WebSocket 不同,SSE 是单向通信,仅服务器能向客户端发送数据。这使得 SSE 非常适合于需要服务器实时推送数据但客户端不需要发送信息的场景,例如实时通知和更新。

tio-boot 发送 Server Event Source

整合 SSE 到 tio-boot 框架中可以让你的应用具备实时数据推送的能力。以下是在 tio-boot 框架中创建一个简单的 SSE 应用的步骤和代码示例:

Controller 发送 SSE 数据

步骤 1: 创建 SSE Controller

首先,创建一个名为 SseController 的类,并用 @RequestPath 注解标记该类和方法。该方法将处理来自 /sse 路径的 SSE 请求。

package com.litongjava.ai.chat.AController;

import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.server.annotation.RequestPath;
import com.litongjava.tio.http.server.sse.SsePacket;
import com.litongjava.tio.server.ServerChannelContext;

import lombok.extern.slf4j.Slf4j;

@RequestPath("/sse")
@Slf4j
public class SseController {

  @RequestPath
  public HttpResponse conversation(HttpRequest request, ServerChannelContext channelContext) {
    // ... (代码继续)
  }
}
步骤 2: 设置 SSE 请求头并发送响应

conversation 方法中,首先设置 SSE 请求头,并发送一个空的响应来初始化 SSE 连接。

// 设置 SSE 请求头
HttpResponse httpResponse = TioRequestContext.getResponse();
httpResponse.addServerSentEventsHeader();
log.info("已经响应请求头");
步骤 3: 发送 SSE 消息

使用 SsePacket 来构造并发送 SSE 消息。在这个例子中,我们通过一个循环发送了 10 条消息。

new Thread(() -> {
  for (int i = 0; i < 10; i++) {
    // ... (循环内容)
  }
  // 手动移除连接
  Tio.remove(channelContext, "remove sse");
}).start();
步骤 4: 测试 SSE 功能

要测试你的 SSE 服务,你可以使用 curl 命令访问 SSE 路径:

curl http://localhost/sse

测试结果应该显示一系列格式化的 SSE 消息:

id: 0
event: message
data: This is message 0

id: 1
event: message
data: This is message 1

id: 2
event: message
data: This is message 2

每条消息都包含一个唯一的 id,事件类型 event,以及实际的消息内容 data

完整的 Cotnroller 代码示例

import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.sse.SsePacket;
import com.litongjava.tio.http.server.annotation.RequestPath;
import com.litongjava.tio.http.server.util.SseUtils;
import com.litongjava.tio.server.ServerChannelContext;

import lombok.extern.slf4j.Slf4j;

@RequestPath("/sse")
@Slf4j
public class SseController {

  @RequestPath
  public HttpResponse conversation(HttpRequest request, ServerChannelContext channelContext) {

    HttpResponse httpResponse = TioRequestContext.getResponse();
    // 设置sse请求头
    httpResponse.addServerSentEventsHeader();
    // 手动发送消息到客户端,因为已经设置了sse的请求头,所以客户端的连接不会关闭
    Tio.send(channelContext, httpResponse);
    log.info("已经设置请求头");
    for (int i = 0; i < 10; i++) {
      String eventName = "message";
      String data = "消息 " + i;
      // byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
      byte[] bytes = data.getBytes();
      SsePacket ssePacket = new SsePacket().id(i).event(eventName).data(bytes);
      // 再次向客户端发送消息
      Tio.send(channelContext, ssePacket);
      log.info("发送数据:{}", i);
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    // 手动移除连接
    SseUtils.closeSeeConnection(channelContext);

    // 告诉处理器不要将消息发送给客户端
    return null;

  }
}

Hanlder 发送 SSE 数据

package com.imaginix.kimi.handler;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.sse.SsePacket;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TextMessageHander {

  public HttpResponse send(HttpRequest httpRequest) {

    ChannelContext channelContext = httpRequest.getChannelContext();

    // 设置sse请求头
    HttpResponse httpResponse = new HttpResponse(httpRequest).setServerSentEventsHeader();

    // 发送http响应包,告诉客户端保持连接
    Tio.send(channelContext, httpResponse);

    // 发送数据
    sendData(channelContext);

    // 告诉处理器不要将消息发送给客户端
    return new HttpResponse().setSend(false);
  }

  private void sendData(ChannelContext channelContext) {

    new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        String eventName = "message";
        String data = "This is message " + i;
        SsePacket ssePacket = new SsePacket().id(i).event(eventName).data(data.getBytes());
        // 再次向客户端发送消息
        Tio.send(channelContext, ssePacket);
        log.info("发送数据:{}", i);
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      // 手动移除连接
      Tio.remove(channelContext, "remove sse");
    }).start();
  }
}

直接发送 data 数据

  • 1.直接向向客户端发 data 数据,没有 event 部分
  • 2.继续发送其他数据
  • 3.发送一个 0 chunk,告诉客户端关闭连接(Http 标准)
  • 4.服务器关闭连接(防止过多占用服务器资源)
//发送数据
String message = "response body is null";
SseBytesPacket ssePacket = new SseBytesPacket(ChunkEncoder.encodeChunk(message.getBytes()));
Tio.send(channelContext, ssePacket);

// 告诉客户端,关闭连接
byte[] zeroChunk = ChunkEncoder.encodeChunk(new byte[0]);
SseBytesPacket endPacket = new SseBytesPacket(zeroChunk);
Tio.send(channelContext, endPacket);

try {
  // 给客户端足够的时间接受消息
  Thread.sleep(1000);
  Tio.remove(channelContext, "remove");
} catch (InterruptedException e) {
  e.printStackTrace();
}

浏览器请求 sse 数据

使用 fetch 请求

直接在浏览器的 Console 窗口运行即可

fetch('http://localhost/api/v1/message/text', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({ message: "Hello, SSE!" })
})
.then(response => {
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  function readStream() {
    reader.read().then(({ done, value }) => {
      if (done) {
        console.log('Stream complete');
        return;
      }
      const chunk = decoder.decode(value, { stream: true });
      console.log('Received chunk:', chunk);
      readStream();
    }).catch(error => {
      console.error('Error reading stream:', error);
    });
  }

  readStream();
})
.catch(error => {
  console.error('Fetch error:', error);
});

handler 配合线程池,读取音频文件并发送

编码

实现了一个 SSE 服务端,用于从资源文件中读取音频数据,并将其作为 Base64 编码的二进制数据块发送给客户端。它使用了一个线程池来异步处理数据传输,并在应用程序关闭时正确关闭线程池。

import java.util.concurrent.ExecutorService;

import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.thread.ThreadUtils;

@AConfiguration
public class ExecutorServiceConfig {

  @AInitialization
  public void config() {
    // 创建包含10个线程的线程池
    ExecutorService executor = ThreadUtils.newFixedThreadPool(10);

    // 项目关闭时,关闭线程池
    TioBootServer.me().addDestroyMethod(() -> {
      if (executor != null && !executor.isShutdown()) {
        executor.shutdown();
      }
    });

  }
}
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;

import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.server.sse.SsePacket;
import com.litongjava.tio.utils.hutool.ResourceUtil;
import com.litongjava.tio.utils.thread.ThreadUtils;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MessageHander {

  public HttpResponse send(HttpRequest httpRequest) {

    ChannelContext channelContext = httpRequest.getChannelContext();

    // 设置sse请求头
    HttpResponse httpResponse = new HttpResponse(httpRequest).setServerSentEventsHeader();

    // 发送http响应包,告诉客户端保持连接
    Tio.send(channelContext, httpResponse);

    // 发送数据
    sendData(channelContext);

    // 告诉处理器不要将消息发送给客户端
    return new HttpResponse().setSend(false);
  }

  private void sendData(ChannelContext channelContext) {
    // 读取音频文件,准备音频流
    InputStream inputStream = ResourceUtil.getResourceAsStream("audio/01.mp3");

    ThreadUtils.getFixedThreadPool().submit(() -> {
      // 手动移除连接
      String eventName = "data";

      try {
        // 从InputStream读取数据并转换为字符串
        byte[] buffer = new byte[1024 * 10];
        int bytesRead;
        int id = 0;
        while ((bytesRead = inputStream.read(buffer)) != -1) {
          // 创建SsePacket并发送
          byte[] binaryData = Arrays.copyOf(buffer, bytesRead);
          String base64Data = Base64.getEncoder().encodeToString(binaryData);
          SsePacket ssePacket = new SsePacket().id(id).event(eventName).data(base64Data.getBytes());
          Tio.send(channelContext, ssePacket);
          id++;
          log.info("id:{}", id);
        }
      } catch (Exception e) {
        log.error("Error sending data: {}", e.getMessage());
      } finally {
        try {
          inputStream.close();
        } catch (Exception e) {
          log.error("Error closing input stream: {}", e.getMessage());
        }
        Tio.remove(channelContext, "remove sse");
      }
    });
  }
}

代码包含两个 Java 类: ExecutorServiceConfigMessageHander。以下是每个类的文档说明:

ExecutorServiceConfig

这个类用于配置和管理线程池。它使用了 JFinal AOP 注解进行配置。

  • @AConfiguration: 这个注解表示该类是一个配置类。
  • config() 方法:
    • 使用 ThreadUtils.newFixedThreadPool(10) 创建了一个包含 10 个线程的线程池。
    • 注册了一个 DestroyMethod,在应用程序关闭时关闭线程池。

MessageHander

这个类负责处理和发送音频数据。它实现了服务器发送事件 (Server-Sent Events, SSE) 的功能。

  • @Slf4j: 这个注解自动配置了一个 slf4j 日志记录器。
  • send(HttpRequest httpRequest) 方法:
    • HttpRequest 对象中获取 ChannelContext
    • 设置 SSE 响应头。
    • 发送 HTTP 响应包以保持连接。
    • 调用 sendData 方法发送数据。
    • 返回一个新的 HttpResponse 对象,并设置 setSend(false) 以避免重复发送响应。
  • sendData(ChannelContext channelContext) 方法:
    • 从资源文件中读取音频文件。
    • 在线程池中执行以下操作:
      • 初始化一个 10KB 的缓冲区。
      • 从输入流中循环读取数据。
      • 为每个读取的数据块:
        • 创建一个新的字节数组,仅包含实际读取的字节数。
        • 使用 Base64 编码器将字节数组编码为字符串。
        • 创建一个新的 SsePacket 对象,包含 ID、事件名称和编码后的数据。
        • 使用 Tio.send 发送 SsePacket 对象。
        • 增加 ID 计数器。
        • 记录日志信息。
      • 在循环结束时关闭输入流。
      • 移除 ChannelContext 连接。

总结

通过上述步骤,你可以在 tio-boot 框架中成功整合 SSE,从而使你的应用能够实时地向客户端推送数据。这种方法的优点在于其简单性和低延迟,非常适用于需要服务器实时更新的场景。