使用 tio-boot 内置 TCP 处理器处理 TCP 请求

tio-boot 框架内置了对 TCP 的支持,并默认在预设的端口上运行。更为特别的是,tio-boot 内置了协议识别算法,使得一个端口能够同时支持 TCP、HTTP 和 WebSocket 三种协议。当一个数据包到达 tio-boot-server 时,TioBootServerHandler 会根据内置的协议识别算法,选择合适的处理器来处理该协议的请求。

支持的协议处理器

  • TCP 协议: 使用自定义的 Handler 进行处理。
  • WebSocket 协议: 使用内置的 WebSocket 处理器。
  • HTTP 协议: 使用内置的 HTTP 处理器。

下面将介绍如何使用 tio-boot 内置的 TCP 功能。

1. 定义 DemoPacket

package com.litongjava.tio.boot.hello.tcp.packet;

import com.litongjava.tio.core.intf.Packet;

/**
 * socket消息包
 */
@SuppressWarnings("serial")
public class DemoPacket extends Packet {
  private byte[] body;

  public byte[] getBody() {
    return body;
  }

  public void setBody(byte[] body) {
    this.body = body;
  }
}

DemoPacket 类继承自 Packet,用于表示一个 TCP 消息包。它包含一个 byte[] body 字段,用于存储消息体的数据。

2. 实现 DemoHandler

package com.litongjava.tio.boot.hello.tcp.handler;

import java.nio.ByteBuffer;

import com.litongjava.tio.boot.hello.tcp.packet.DemoPacket;
import com.litongjava.tio.boot.tcp.ServerTcpHandler;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.core.intf.Packet;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DemoHandler implements ServerTcpHandler {

  public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
      throws TioDecodeException {
    log.info("buffer:{}", buffer);
    byte[] bytes = new byte[readableLength];
    buffer.get(bytes);
    DemoPacket imPackage = new DemoPacket();
    imPackage.setBody(bytes);
    return imPackage;
  }

  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
    DemoPacket helloPacket = (DemoPacket) packet;
    byte[] body = helloPacket.getBody();
    int bodyLength = body.length;
    log.info("encode:{}", bodyLength);
    ByteBuffer buffer = ByteBuffer.allocate(bodyLength);
    buffer.order(tioConfig.getByteOrder());
    buffer.put(body);
    return buffer;
  }

  public void handler(Packet packet, ChannelContext channelContext) throws Exception {
    DemoPacket packingPacket = (DemoPacket) packet;
    byte[] body = packingPacket.getBody();
    if (body == null) {
      return;
    }
    String string = new String(body);
    log.info("received:{}", string);
    String sendMessage = "echo:" + string;
    log.info("sendMessage:{}", sendMessage);
    byte[] bytes = sendMessage.getBytes();
    DemoPacket responsePacket = new DemoPacket();
    responsePacket.setBody(bytes);
    log.info("开始发送响应");
    Tio.send(channelContext, responsePacket);
    log.info("响应完成");
  }
}

DemoHandler 类实现了 ServerTcpHandler 接口,主要负责:

  • 解码 (decode): 从 ByteBuffer 中读取数据并将其转换为 DemoPacket
  • 编码 (encode): 将 DemoPacket 的数据转换为 ByteBuffer,以便发送。
  • 处理 (handler): 处理接收到的消息,并在处理完毕后向客户端发送响应。

3. 实现 DemoListener

package com.litongjava.tio.boot.hello.tcp.listener;

import com.litongjava.tio.boot.tcp.ServerHanlderListener;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.intf.Packet;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DemoListener implements ServerHanlderListener {

  public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect)
      throws Exception {
    log.info("{},{},{}", channelContext, isConnected, isReconnect);
  }

  public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
    log.info("{},{},{}", channelContext, packet, packetSize);
  }

  public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
    log.info("{},{}", channelContext, receivedBytes);
  }

  public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
    log.info("{},{}", channelContext, packet);
  }

  public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
    log.info("{},{},{}", channelContext, packet, cost);
  }

  public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove)
      throws Exception {
    log.info("{},{},{},{}", channelContext, throwable, remark, isRemove);
  }

  public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
    log.info("{},{},{}", channelContext, interval, heartbeatTimeoutCount);
    return false;
  }
}

DemoListener 类实现了 ServerHanlderListener 接口,用于监听服务器在处理 TCP 连接时的各个事件,如连接建立、消息解码、接收字节数、消息发送、消息处理完毕、连接关闭等。

4. 配置 TioBootServerConfig

package com.litongjava.tio.boot.hello.tcp.config;

import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.jfinal.aop.annotation.BeforeStartConfiguration;
import com.litongjava.tio.boot.hello.tcp.handler.DemoHandler;
import com.litongjava.tio.boot.hello.tcp.listener.DemoListener;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.boot.tcp.ServerTcpHandler;

@BeforeStartConfiguration
public class TioBootServerConfig {

  @AInitialization
  public void config() {
    ServerTcpHandler demoHandler = new DemoHandler();
    TioBootServer.me().setServerTcpHandler(demoHandler);

    DemoListener demoListener = new DemoListener();
    TioBootServer.me().setServerAioListener(demoListener);
  }
}

TioBootServerConfig 类负责在服务器启动之前完成 ServerTcpHandlerServerListener 的初始化配置。通过使用 @BeforeStartConfiguration@AInitialization 注解,确保 DemoHandlerDemoListener 被正确设置到服务器中。

5. 运行日志示例

以下是服务器运行时的日志示例,展示了服务器如何处理来自客户端的 TCP 请求:

2024-01-29 12:01:09.841 [tio-group-6] INFO  DemoListener.onAfterConnected:13 - server:0.0.0.0:80, client:127.0.0.1:2820,true,false
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoListener.onAfterReceivedBytes:21 - server:0.0.0.0:80, client:127.0.0.1:2820,2
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler.decode:20 - buffer:java.nio.HeapByteBuffer[pos=0 lim=2 cap=30720]
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoListener.onAfterDecoded:17 - server:0.0.0.0:80, client:127.0.0.1:2820,com.litongjava.tio.boot.hello.tcp.packet.DemoPacket@27114133,2
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler.handler:53 - received:gu
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler.handler:56 - sendMessage:echo:gu
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler.handler:62 - 开始发送响应
2024-01-29 12:01:09.843 [tio-group-7] INFO  DemoHandler.handler:64 - 响应完成
2024-01-29 12:01:09.843 [tio-worker-4] INFO  DemoHandler.encode:35 - encode:7
202

4-01-29 12:01:09.843 [tio-group-7] INFO  DemoListener.onAfterHandled:29 - server:0.0.0.0:80, client:127.0.0.1:2820,com.litongjava.tio.boot.hello.tcp.packet.DemoPacket@27114133,0
2024-01-29 12:01:09.844 [tio-group-8] INFO  DemoListener.onAfterSent:25 - server:0.0.0.0:80, client:127.0.0.1:2820,com.litongjava.tio.boot.hello.tcp.packet.DemoPacket@5da82e61
2024-01-29 12:01:09.845 [tio-worker-5] INFO  DemoListener.onBeforeClose:44 - server:0.0.0.0:80, client:127.0.0.1:2820,null,对方关闭了连接,true

6. 使用 ByteBufferPacket

如果你不需要手动处理 ByteBuffer 的编码和解码,可以使用 tio-boot 提供的 ByteBufferPacket 类。你可以直接将 ByteBuffer 添加到 ByteBufferPacket 中,而无需自己管理编码和解码的细节。

import com.litongjava.tio.boot.tcp.ByteBufferPacket;

测试源码地址

你可以在 GitHub 上找到完整的测试源码,了解更多详细实现和配置: GitHub 地址open in new window

代码解释

1. DemoPacket

这是一个继承自 Packet 的类,用于表示一个 socket 消息包。它主要包含一个 byte[] body 字段来存储消息体的数据。

2. DemoHandler

这个类实现了 ServerTcpHandler 接口,用于处理 TCP 协议。它主要包含三个方法:

  • decode: 解码方法,用于从 ByteBuffer 中读取数据并转换为 DemoPacket
  • encode: 编码方法,将 DemoPacket 的数据转换为 ByteBuffer,以便于传输。
  • handler: 处理方法,用于接收解码后的 DemoPacket,执行业务逻辑,并响应客户端。

3. DemoTioServerListener

这个类实现了 ServerListener 接口,用于监听服务器在处理 TCP 连接时的不同事件如连接建立、消息解码、消息接收、消息发送、处理完成和连接关闭等。 。这个监听器允许你在连接的生命周期中的关键时刻进行自定义处理。这对于监控、日志记录、资源管理和异常处理等方面非常有用。 下面是对 ServerListener 中各个方法的详细解释:

1. onAfterConnected

  • 作用: 当一个新的连接建立后调用。
  • 参数:
    • ChannelContext: 表示当前连接的上下文。
    • isConnected: 表示是否成功连接。
    • isReconnect: 表示是否为重连。
  • 实现逻辑: 在这个方法中,通常用于记录连接建立的信息,或者进行一些初始化操作。

2. onAfterDecoded

  • 作用: 在消息解码后调用。
  • 参数:
    • ChannelContext: 当前连接的上下文。
    • Packet: 解码后的数据包。
    • packetSize: 数据包的大小。
  • 实现逻辑: 用于处理解码后的数据,如记录日志或进行一些验证。

3. onAfterReceivedBytes

  • 作用: 在接收到数据字节后调用。
  • 参数:
    • ChannelContext: 当前连接的上下文。
    • receivedBytes: 接收到的字节数。
  • 实现逻辑: 可用于监控数据流量,如记录接收到的总字节数。

4. onAfterSent

  • 作用: 在发送数据包后调用。
  • 参数:
    • ChannelContext: 当前连接的上下文。
    • Packet: 发送的数据包。
    • isSentSuccess: 是否发送成功。
  • 实现逻辑: 用于确认数据发送的状态,可以用来记录日志或处理发送失败的情况。

5. onAfterHandled

  • 作用: 在消息处理完毕后调用。
  • 参数:
    • ChannelContext: 当前连接的上下文。
    • Packet: 被处理的数据包。
    • cost: 处理消耗的时间。
  • 实现逻辑: 用于记录处理消息所需的时间,或进行一些后处理工作。

6. onBeforeClose

  • 作用: 在连接关闭前触发。
  • 参数:
    • ChannelContext: 当前连接的上下文。
    • Throwable: 引起关闭的异常,可能为空。
    • remark: 备注信息,可能为空。
    • isRemove: 是否移除。
  • 实现逻辑: 在连接即将关闭时被调用,通常用于清理资源或记录日志。

7. onHeartbeatTimeout

  • 作用: 心跳超时时调用。
  • 参数:
    • ChannelContext: 当前连接的上下文。
    • interval: 已经多久没有收发消息了,单位是毫秒。
    • heartbeatTimeoutCount: 心跳超时次数。
  • 实现逻辑: 当连接心跳超时时调用,可以根据需要决定是否关闭连接。

4. TioBootServerConfig

  • @BeforeStartConfiguration:使用该注解标记的配置类会自在启动服务器之前执行,这里的功能是在服务器启动之前完成 ServerTcpHandler 和 ServerListener 初始化.

总结

整体上,这段代码展示了如何使用 tio-boot 框架来构建一个同时支持 TCP、HTTP 和 WebSocket 协议的服务器。它通过定义消息包的格式(DemoPacket)、处理逻辑(DemoHandler)、事件监听(DemoListener)以及应用程序启动配置(TioBootServerConfig),实现了一个基本的网络通信服务器。