内置 TCP 处理器

简介

tio-boot 框架内置了对 TCP 的支持.tio-boot 内置了协议识别算法,使得一个端口能够同时支持 TCPHTTPWebSocket 三种协议。当一个数据包到达 tio-boot-server 时,TioBootServerHandler 会根据内置的协议识别算法,选择合适的处理器来处理该协议的请求。

支持的协议处理器

  • TCP 协议:使用自定义的 Handler 进行处理。
  • WebSocket 协议:使用内置的 WebSocket 处理器。
  • HTTP 协议:使用内置的 HTTP 处理器。

下面将介绍如何使用 tio-boot 内置的 TCP 功能,构建一个简单的 TCP 服务端应用。

使用 TCP 处理器示例

1. 添加 Maven 依赖

首先,在项目的 pom.xml 文件中添加必要的依赖:

<properties>
  <java.version>1.8</java.version>
  <tio.boot.version>1.7.4</tio.boot.version>
  <jfinal-aop.version>1.3.3</jfinal-aop.version>
  <lombok.version>1.18.24</lombok.version>
</properties>

<dependencies>
  <!-- tio-boot 核心依赖 -->
  <dependency>
    <groupId>com.litongjava</groupId>
    <artifactId>tio-boot</artifactId>
    <version>${tio.boot.version}</version>
  </dependency>

  <!-- JFinal AOP 依赖 -->
  <dependency>
    <groupId>com.litongjava</groupId>
    <artifactId>jfinal-aop</artifactId>
    <version>${jfinal-aop.version}</version>
  </dependency>

  <!-- 日志依赖 -->
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
  </dependency>

  <!-- Lombok 依赖 -->
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>${lombok.version}</version>
    <scope>provided</scope>
  </dependency>

  <!-- 热部署依赖,可选 -->
  <dependency>
    <groupId>com.litongjava</groupId>
    <artifactId>hotswap-classloader</artifactId>
    <version>1.2.6</version>
  </dependency>
</dependencies>

说明:

  • tio-boottio-boot 框架的核心依赖,用于提供 TCP 服务的基本功能。
  • JFinal AOP:轻量级的 AOP 框架,用于面向切面编程。
  • Logback:日志框架,用于记录日志信息。
  • Lombok:简化 Java 代码的工具,可自动生成 getter/setter 等方法。
  • hotswap-classloader:热部署工具,可选。

2. 定义 DemoPacket

接下来,创建一个用于表示 TCP 消息包的 DemoPacket 类:

package com.litongjava.tio.boot.hello.tcp.packet;

import com.litongjava.aio.Packet;

/**
 * TCP 消息包类
 */
public class DemoPacket extends Packet {
  private static final long serialVersionUID = 1L;

  private byte[] body;

  public byte[] getBody() {
    return body;
  }

  public void setBody(byte[] body) {
    this.body = body;
  }
}

说明:

  • DemoPacket 类继承自 Packet,用于表示一个 TCP 消息包。
  • 包含一个 byte[] body 字段,用于存储消息体的数据。

3. 实现 DemoHandler

DemoHandler 负责处理 TCP 消息的编码、解码和业务逻辑处理。

package com.litongjava.tio.boot.hello.tcp.handler;

import java.nio.ByteBuffer;

import com.litongjava.aio.Packet;
import com.litongjava.tio.boot.hello.tcp.packet.DemoPacket;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.server.intf.ServerAioHandler;

import lombok.extern.slf4j.Slf4j;

/**
 * TCP 服务器处理器
 */
@Slf4j
public class DemoHandler implements ServerAioHandler {

  @Override
  public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
      throws TioDecodeException {
    log.info("开始解码,buffer: {}", buffer);

    // 可读字节数
    int readableBytes = buffer.remaining();

    // 读取所有可读字节
    byte[] bytes = new byte[readableBytes];
    buffer.get(bytes);

    // 封装为 DemoPacket
    DemoPacket packet = new DemoPacket();
    packet.setBody(bytes);
    return packet;
  }

  @Override
  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
    DemoPacket demoPacket = (DemoPacket) packet;
    byte[] body = demoPacket.getBody();

    if (body == null) {
      log.warn("消息体为空,无法编码");
      return null;
    }

    // 创建 ByteBuffer,容量为消息体长度
    ByteBuffer buffer = ByteBuffer.allocate(body.length);
    // 设置字节序
    buffer.order(tioConfig.getByteOrder());
    // 写入消息体
    buffer.put(body);
    return buffer;
  }

  @Override
  public void handler(Packet packet, ChannelContext channelContext) throws Exception {
    DemoPacket demoPacket = (DemoPacket) packet;
    byte[] body = demoPacket.getBody();

    if (body == null) {
      log.warn("接收到的消息体为空");
      return;
    }

    String message = new String(body, "UTF-8");
    log.info("接收到消息:{}", message);

    // 构建响应消息
    String responseMessage = "echo: " + message;
    log.info("发送响应消息:{}", responseMessage);

    DemoPacket responsePacket = new DemoPacket();
    responsePacket.setBody(responseMessage.getBytes("UTF-8"));

    // 发送响应
    Tio.send(channelContext, responsePacket);
  }
}

说明:

  • 解码 (decode):从 ByteBuffer 中读取数据并封装为 DemoPacket
  • 编码 (encode):将 DemoPacket 的消息体写入 ByteBuffer,用于发送。
  • 处理 (handler):处理接收到的消息,并构建响应消息发送给客户端。

4. 实现 DemoListener

DemoListener 用于监听服务器的各类事件,例如连接建立、消息接收、消息发送等。

package com.litongjava.tio.boot.hello.tcp.listener;

import com.litongjava.aio.Packet;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.server.intf.ServerAioListener;

import lombok.extern.slf4j.Slf4j;

/**
 * 服务器事件监听器
 */
@Slf4j
public class DemoListener implements ServerAioListener {

  @Override
  public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect)
      throws Exception {
    log.info("连接建立后,channelContext: {}, isConnected: {}, isReconnect: {}", channelContext, isConnected, isReconnect);
  }

  @Override
  public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
    log.info("消息解码后,channelContext: {}, packet: {}, packetSize: {}", channelContext, packet, packetSize);
  }

  @Override
  public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
    log.info("接收字节后,channelContext: {}, receivedBytes: {}", channelContext, receivedBytes);
  }

  @Override
  public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
    log.info("消息发送后,channelContext: {}, packet: {}, isSentSuccess: {}", channelContext, packet, isSentSuccess);
  }

  @Override
  public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
    log.info("消息处理后,channelContext: {}, packet: {}, 耗时: {} ms", channelContext, packet, cost);
  }

  @Override
  public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove)
      throws Exception {
    log.info("连接关闭前,channelContext: {}, throwable: {}, remark: {}, isRemove: {}", channelContext, throwable, remark, isRemove);
  }

  @Override
  public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
    log.warn("心跳超时,channelContext: {}, interval: {} ms, heartbeatTimeoutCount: {}", channelContext, interval, heartbeatTimeoutCount);
    // 返回 true 表示继续保持连接,返回 false 则关闭连接
    return false;
  }
}

说明:

  • 通过实现 ServerAioListener 接口的各个方法,可以监听服务器的各种事件,方便进行日志记录和特殊处理。

5. 配置 TioBootServerConfig

在服务器启动之前,配置自定义的 HandlerListener

package com.litongjava.tio.boot.hello.tcp.config;

import com.litongjava.annotation.AInitialization;
import com.litongjava.annotation.BeforeStartConfiguration;
import com.litongjava.tio.boot.hello.tcp.handler.DemoHandler;
import com.litongjava.tio.boot.hello.tcp.listener.DemoListener;
import com.litongjava.tio.boot.server.TioBootServer;

/**
 * 服务器配置类
 */
@BeforeStartConfiguration
public class TioBootServerConfig {

  @Initialization
  public void config() {
    TioBootServer server = TioBootServer.me();

    // 设置自定义的 Handler
    DemoHandler demoHandler = new DemoHandler();
    server.setServerAioHandler(demoHandler);

    // 设置自定义的 Listener
    DemoListener demoListener = new DemoListener();
    server.setServerAioListener(demoListener);
  }
}

说明:

  • 使用 @BeforeStartConfiguration 注解,确保配置在服务器启动之前进行。
  • @Initialization 注解表示该方法在初始化阶段执行。
  • 将自定义的 DemoHandlerDemoListener 设置到 TioBootServer 中。

6. 启动类

创建应用程序的主类,启动服务器。

package com.litongjava.tio.boot.hello.tcp;

import com.litongjava.annotation.AComponentScan;
import com.litongjava.tio.boot.TioApplication;

/**
 * 应用程序启动类
 */
@AComponentScan
public class ServerTcpApp {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    TioApplication.run(ServerTcpApp.class, args);
    long end = System.currentTimeMillis();
    System.out.println("服务器启动耗时:" + (end - start) + " ms");
  }
}

说明:

  • 使用 @AComponentScan 注解,开启组件扫描,自动扫描配置类。
  • 调用 TioApplication.run() 方法启动服务器。

7. 运行日志示例

当服务器运行并接收到客户端请求时,日志可能如下所示:

2024-01-29 12:01:09.841 [tio-group-6] INFO  DemoListener - 连接建立后,channelContext: server:0.0.0.0:80, client:127.0.0.1:2820, isConnected: true, isReconnect: false
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoListener - 接收字节后,channelContext: server:0.0.0.0:80, client:127.0.0.1:2820, receivedBytes: 2
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler - 开始解码,buffer: java.nio.HeapByteBuffer[pos=0 lim=2 cap=30720]
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoListener - 消息解码后,channelContext: server:0.0.0.0:80, client:127.0.0.1:2820, packet: com.litongjava.tio.boot.hello.tcp.packet.DemoPacket@27114133, packetSize: 2
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler - 接收到消息:hi
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler - 发送响应消息:echo: hi
2024-01-29 12:01:09.842 [tio-group-7] INFO  DemoHandler - 开始发送响应
2024-01-29 12:01:09.843 [tio-group-7] INFO  DemoHandler - 响应完成
2024-01-29 12:01:09.843 [tio-worker-4] INFO  DemoHandler - 编码消息,长度: 8
2024-01-29 12:01:09.843 [tio-group-7] INFO  DemoListener - 消息处理后,channelContext: server:0.0.0.0:80, client:127.0.0.1:2820, packet: com.litongjava.tio.boot.hello.tcp.packet.DemoPacket@27114133, 耗时: 0 ms
2024-01-29 12:01:09.844 [tio-group-8] INFO  DemoListener - 消息发送后,channelContext: server:0.0.0.0:80, client:127.0.0.1:2820, packet: com.litongjava.tio.boot.hello.tcp.packet.DemoPacket@5da82e61, isSentSuccess: true
2024-01-29 12:01:09.845 [tio-worker-5] INFO  DemoListener - 连接关闭前,channelContext: server:0.0.0.0:80, client:127.0.0.1:2820, throwable: null, remark: 对方关闭了连接, isRemove: true

说明:

  • 日志展示了服务器处理请求的完整流程,从连接建立、消息接收、消息解码、处理、响应发送到连接关闭。
  • 可以根据日志信息进行调试和问题定位。

8. 示例源码地址

完整的测试源码可以在 GitHub 上获取,方便进行深入学习和参考:

GitHub 地址open in new window

9. 使用 ByteBufferPacket

如果你希望简化编码和解码的过程,不想手动处理 ByteBuffer,可以使用 tio-boot 提供的 ByteBufferPacket 类。它允许你直接将数据写入 ByteBuffer,减少编码和解码的繁琐步骤。

import com.litongjava.aio.ByteBufferPacket;

示例:

DemoHandler 中,可以使用 ByteBufferPacket 代替自定义的 DemoPacket

// 创建 ByteBufferPacket
ByteBufferPacket responsePacket = new ByteBufferPacket();

// 将响应消息写入 ByteBufferPacket
responsePacket.setByteBuffer(ByteBuffer.wrap(responseMessage.getBytes("UTF-8")));

// 发送响应
Tio.send(channelContext, responsePacket);

说明:

  • ByteBufferPacket 已经封装了对 ByteBuffer 的处理,你只需要关注消息体本身。

总结:

本文介绍了如何使用 tio-boot 框架内置的 TCP 处理器,构建一个简单的 TCP 服务器应用。通过自定义 PacketHandlerListener,实现了消息的编码、解码和业务逻辑处理。并提供了详细的代码示例和解释,帮助你快速上手开发。