Netty WebSocket Server 二进制数据传输

简介

在之前的章节中,我们已经完成了以下两部分操作:

  • 编写了一个 WebSocket 服务器并实现了聊天服务功能。
  • 使用 protoc 工具生成了 Java 的协议缓冲区(Protobuf)文件。

然而,目前客户端和服务器之间传输的数据是以 JSON 格式进行的。本章将介绍如何在客户端和服务器之间通过二进制数据进行通信。

添加依赖

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>4.26.1</version>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.65.Final</version>
</dependency>

修改 ChatHandler 代码

以下是 ChatHandler 的代码修改,以支持二进制数据的处理:

package com.litongjava.im.netty.handler;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import com.alibaba.fastjson2.JSONObject;
import com.google.protobuf.InvalidProtocolBufferException;
import com.litongjava.im.netty.protobuf.ChatPacketOuter;
import com.litongjava.im.netty.protobuf.ChatPacketOuter.ChatPacket;
import com.litongjava.im.netty.utils.MessageUtils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

/**
 * Netty 消息处理器
 */
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<Object> {

  @Override
  protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
    if (msg == null) {
      log.info("接收到的数据为空");
      return;
    }
    if (msg instanceof FullHttpRequest) {
      log.info("收到 HTTP 请求");
    } else if (msg instanceof TextWebSocketFrame) { // 处理文本数据
      processText(channelHandlerContext, msg);
    } else if (msg instanceof BinaryWebSocketFrame) { // 处理二进制数据
      processBinary(channelHandlerContext, msg);
    }
  }

  private void processBinary(ChannelHandlerContext channelHandlerContext, Object msg) {
    BinaryWebSocketFrame binary = (BinaryWebSocketFrame) msg;
    ByteBuf buf = binary.content();
    byte[] array = new byte[buf.readableBytes()];
    buf.readBytes(array);

    ChatPacket packet;
    try {
      packet = ChatPacketOuter.ChatPacket.parseFrom(array);
    } catch (InvalidProtocolBufferException e) {
      log.error("解析二进制数据失败", e);
      return;
    }

    int code = packet.getCode();
    String content = packet.getMsg();
    JSONObject msgJson = JSONObject.parseObject(content);
    if (code == 101) {
      // 假设用户ID在消息中以 "userId" 传递
      Integer userId = msgJson.getInteger("userId");
      MessageUtils.nettyUserMap.put(userId, channelHandlerContext.channel());
      log.info("Netty 客户端 " + userId + " 加入成功!");

      // 向客户端发送欢迎消息或确认信息
      String welcomeMessage = "用户 " + userId + " 加入成功!";
      ByteBuf welcomeBuf = Unpooled.copiedBuffer(welcomeMessage.getBytes(StandardCharsets.UTF_8));
      channelHandlerContext.channel().writeAndFlush(new BinaryWebSocketFrame(welcomeBuf));

    } else {
      Integer toUserId = msgJson.getInteger("toUserId"); // 假设接收方用户ID在消息中传递
      Channel toUserChannel = MessageUtils.nettyUserMap.get(toUserId);

      if (toUserChannel != null) {
        ByteBuf contentBuf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8));
        toUserChannel.writeAndFlush(new BinaryWebSocketFrame(contentBuf));
      } else {
        log.info("用户 " + toUserId + " 未连接。");
      }
    }
  }

  private void processText(ChannelHandlerContext channelHandlerContext, Object msg) {
    TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
    String content = textWebSocketFrame.text();

    JSONObject msgJson = JSONObject.parseObject(content);
    Integer code = msgJson.getInteger("code");
    if (code == 101) {
      // 假设用户ID在消息中以 "userId" 传递
      Integer userId = msgJson.getInteger("userId");
      MessageUtils.nettyUserMap.put(userId, channelHandlerContext.channel());
      log.info("Netty 客户端 " + userId + " 加入成功!");

      // 向客户端发送欢迎消息或确认信息
      String welcomeMessage = "用户 " + userId + " 加入成功!";
      channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(welcomeMessage));

    } else {
      Integer toUserId = msgJson.getInteger("toUserId"); // 假设接收方用户ID在消息中传递
      Channel toUserChannel = MessageUtils.nettyUserMap.get(toUserId);

      if (toUserChannel != null) {
        toUserChannel.writeAndFlush(new TextWebSocketFrame(content));
      } else {
        log.info("用户 " + toUserId + " 未连接。");
      }
    }
  }

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    log.info("新的连接: {}", ctx);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    // 获取当前断开连接的 Channel
    Channel channel = ctx.channel();

    // 迭代 nettyUserMap,找到对应的用户ID并移除
    Integer userIdToRemove = null;
    for (Map.Entry<Integer, Channel> entry : MessageUtils.nettyUserMap.entrySet()) {
      if (entry.getValue().equals(channel)) {
        userIdToRemove = entry.getKey();
        break;
      }
    }

    // 移除找到的用户ID
    if (userIdToRemove != null) {
      MessageUtils.nettyUserMap.remove(userIdToRemove);
      log.info("用户 " + userIdToRemove + " 已断开连接并移除。");
    } else {
      log.info("未找到对应的用户ID。");
    }
  }
}

测试

使用开发工具将以下 JSON 格式数据转换为二进制格式,并发送到服务器进行测试:

{
  "code": 101,
  "userId": 1
}
{
  "code": 101,
  "userId": 2
}
{
  "code": 200,
  "fromUserId": 1,
  "toUserId": 2,
  "message": "Hello, this is a message from Client 1 to Client 2."
}
{
  "code": 200,
  "fromUserId": 2,
  "toUserId": 1,
  "message": "Hello, this is a message from Client 2 to Client 1."
}

通过以上步骤,你可以将客户端和服务器之间的消息从 JSON 转换为二进制格式,进一步提高传输效率并减少数据量。这种方法特别适用于高性能应用场景,如实时通信或大规模数据交换。