使用 tio-core 添加独立 tcp 端口的 处理 tcp 请求

使用 tio-boot 内置 tio-core,可以可以 tcp-core 启动 tcp 服务,处理 tcp 数据, tio-boot 提供了两种处理 tcp 数据数据的方式

  • 1.t-io core 需要使用单独端口
  • 2.使用 tio-boot tcphander,无须独立的端口,复用 tio-boot server 的端口,同时支持 http 协议和 websocket 协议

启动 tcp-server

DemoPacket

import com.litongjava.tio.core.intf.Packet;

/**
* socket消息包
*/
@SuppressWarnings("serial")
public class DemoPacket extends Packet {
  private byte[] body;

  public byte[] getBody() {
    return body;
  }

  public void setBody(byte[] body) {
    this.body = body;
  }
}

DemoTioServerListener

import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.intf.Packet;
import com.litongjava.tio.server.intf.ServerAioListener;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DemoTioServerListener implements ServerAioListener {
  public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
  }

  public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
  }

  public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
  }

  public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
  }

  public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
  }

  /**
   * 连接关闭前触发本方法
   *
   * @param channelContext        the channelcontext
   * @param throwable the throwable 有可能为空
   * @param remark    the remark 有可能为空
   * @param isRemove
   * @throws Exception
   */

  public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
    log.info("关闭后清除认证信息");
    Tio.unbindToken(channelContext);
  }

  /**
   * @param channelContext
   * @param interval              已经多久没有收发消息了,单位:毫秒
   * @param heartbeatTimeoutCount 心跳超时次数,第一次超时此值是1,以此类推。此值被保存在:channelContext.stat.heartbeatTimeoutCount
   * @return 返回true,那么服务器则不关闭此连接;返回false,服务器将按心跳超时关闭该连接
   */
  public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
    log.info("心跳超时");
    Tio.unbindToken(channelContext);
    return false;
  }
}

DemoTioServerHandler.java

import java.nio.ByteBuffer;

import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.core.intf.Packet;
import com.litongjava.tio.server.intf.ServerAioHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DemoTioServerHandler implements ServerAioHandler {

  public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
      throws TioDecodeException {
    log.info("buffer:{}", buffer);
    // 获取由ByteBuffer支持的字节数组
    byte[] bytes = new byte[readableLength];
    buffer.get(bytes);
    // 封装为DemoPacket
    DemoPacket imPackage = new DemoPacket();
    imPackage.setBody(bytes);
    return imPackage;
  }

  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
    DemoPacket helloPacket = (DemoPacket) packet;
    byte[] body = helloPacket.getBody();
    // ByteBuffer的总长度是消息体长度
    int bodyLength = body.length;
    log.info("encode:{}", bodyLength);

    // 创建一个新的ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(bodyLength);
    // 设置字节序
    buffer.order(tioConfig.getByteOrder());
    // 消息消息体
    buffer.put(body);
    return buffer;
  }

  public void handler(Packet packet, ChannelContext channelContext) throws Exception {
    DemoPacket packingPacket = (DemoPacket) packet;
    byte[] body = packingPacket.getBody();
    if (body == null) {
      return;
    }
    String string = new String(body);
    log.info("received:{}", string);
    // 响应数据
    String sendMessage = "收到了你的消息,你的消息是:" + string;
    log.info("sendMessage:{}", sendMessage);
    byte[] bytes = sendMessage.getBytes();
    // 响应包
    DemoPacket responsePacket = new DemoPacket();
    responsePacket.setBody(bytes);
    // 响应消息
    log.info("开始响应");
    Tio.send(channelContext, responsePacket);
    log.info("响应完成");
  }
}

TioServerConfig

package demo.tcp.config;

import java.io.IOException;

import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;
import com.litongjava.tio.server.intf.ServerAioHandler;
import com.litongjava.tio.server.intf.ServerAioListener;

import demo.tcp.server.DemoTioServerHandler;
import demo.tcp.server.DemoTioServerListener;

@AConfiguration
public class TioServerConfig {

  @AInitialization
  public void demoTioServer() {
    // handler, 包括编码、解码、消息处理
    ServerAioHandler serverHandler = new DemoTioServerHandler();
    // 事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
    ServerAioListener serverListener = new DemoTioServerListener();
    // 配置对象
    ServerTioConfig tioServerConfig = new ServerTioConfig("tcp-server", serverHandler, serverListener);

    // 设置心跳,-1 取消心跳
    tioServerConfig.setHeartbeatTimeout(-1);
    // TioServer对象
    TioServer tioServer = new TioServer(tioServerConfig);

    // 启动服务
    try {
      tioServer.start(null, 9998);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

启动类

package demo.tcp;

import com.litongjava.hotswap.wrapper.tio.boot.TioApplicationWrapper;
import com.litongjava.jfinal.aop.annotation.AComponentScan;

@AComponentScan
public class Main {
  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    TioApplicationWrapper.run(Main.class, args);
    long end = System.currentTimeMillis();
    System.out.println((end - start) + "(ms)");
  }
}

上面的代码是一个使用 Java TIO 网络框架实现的简单服务器应用的示例。让我们逐部分进行解释:

DemoPacket 类(数据包定义)

  • 目的:为服务器定义一个自定义的数据包结构。在网络通信中,数据包是数据的格式化单位。
  • 主要元素
    • 继承自 Tio 框架的 Packet 类。
    • 包含一个 byte[] body 用于存储数据载荷。

DemoTioServerListener 类(服务器事件监听器)

  • 目的:实现 ServerAioListener 接口,定义各种服务器事件的行为。
  • 关键功能
    • onAfterConnectedonAfterDecoded 等:在特定事件(如连接或解码数据包)后触发的方法。
    • onBeforeClose:在关闭连接之前执行的操作,例如解绑令牌。
    • onHeartbeatTimeout:管理超时的连接。

DemoTioServerHandler 类(服务器处理器)

  • 目的:实现 ServerAioHandler 接口,处理数据包的编码、解码和处理。
  • 关键功能
    • decode:将传入的原始数据转换为 DemoPacket 对象。
    • encode:将 DemoPacket 对象转换为传输的原始数据。
    • handler:处理接收到的数据包并发送响应。

TioServerConfig 类(tio-boot 配置)

  • 目的:使用 tio-boot 框架的注解来配置并启动 Tio 服务器。
  • 主要元素
    • @AConfiguration 注解标记,表示这是一个 tio-boot 配置类。
    • 包含一个用 @AInitialization 注解的方法 demoTioServer,tio-boot 框架启动时会执行改方法,该方法启动 tioServer
    • 配置心跳超时、服务器处理器和监听器。
    • 在指定端口(6789)上启动服务器。

整体流程

  1. 数据包定义:自定义数据包(DemoPacket)来携带数据。
  2. 事件处理DemoTioServerListener 监听服务器事件,如连接、断开连接和心跳。
  3. 数据处理DemoTioServerHandler 处理数据包的编码和解码,并处理传入的消息。
  4. 服务器设置和启动TioServerConfig 配置并启动 Tio 服务器,使用定义的处理器和监听器。

这段代码演示了 TIO 服务器的基本但完整的设置,包括数据包处理、事件监听、消息处理,以及与 tio-boot 框架的集成,便于管理和配置。

启动 tcp client

Tio-Boot 同样内置了 tcp-client,你可是使用 tcpClient 向服务端发送消息

DemoClientAioHandler

package demo.tcp.client;

import java.nio.ByteBuffer;

import com.litongjava.tio.client.intf.ClientAioHandler;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.core.intf.Packet;

import demo.tcp.server.DemoPacket;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DemoClientAioHandler implements ClientAioHandler {

  /**
   * 解码:把接收到的ByteBuffer解码成应用可以识别的业务消息包
   */
  @Override
  public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext var5)
      throws TioDecodeException {
    log.info("buffer:{}", buffer);
    // 转换前准备ByteBuffer
    int length = buffer.remaining();
    // 获取由ByteBuffer支持的字节数组
    byte[] bytes = new byte[length];
    buffer.get(bytes);
    // 封装为DemoPacket
    DemoPacket imPackage = new DemoPacket();
    imPackage.setBody(bytes);
    return imPackage;

  }

  /**
   * 编码:把业务消息包编码为可以发送的ByteBuffer
   */
  @Override
  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext chanelContext) {

    DemoPacket helloPacket = (DemoPacket) packet;
    byte[] body = helloPacket.getBody();
    // ByteBuffer的总长度是消息体长度
    int bodyLength = body.length;
    log.info("encode:{}", bodyLength);

    // 创建一个新的ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(bodyLength);
    // 设置字节序
    buffer.order(tioConfig.getByteOrder());
    // 消息消息体
    buffer.put(body);
    return buffer;
  }

  /**
   * 处理消息
   */
  @Override
  public void handler(Packet packet, ChannelContext var2) throws Exception {
    log.info("handler");
    DemoPacket helloPacket = (DemoPacket) packet;
    byte[] body = helloPacket.getBody();
    if (body != null) {
      String str = new String(body);
      System.out.println("received::" + str);
    }
  }

  /**
   * 此方法如果返回null,框架层面则不会发出心跳,如果返回非null,框架层面会定时发送本方法返回的消息包
   */
  @Override
  public Packet heartbeatPacket(ChannelContext var1) {
    return null;
  }
}

DemoTioClient

package demo.tcp.client;

import java.io.IOException;

import com.litongjava.tio.client.ClientChannelContext;
import com.litongjava.tio.client.ClientTioConfig;
import com.litongjava.tio.client.ReconnConf;
import com.litongjava.tio.client.TioClient;
import com.litongjava.tio.client.intf.ClientAioHandler;
import com.litongjava.tio.client.intf.ClientAioListener;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.Tio;

import demo.tcp.server.DemoPacket;

public class DemoTioClient {
  /**
   * 启动程序
   */
  public static void main(String[] args) throws Exception {

    // 服务器节点
    Node serverNode = new Node("127.0.0.1", 9998);
    // handler,包含编解码,消息处理
    ClientAioHandler clientAioHandler = new DemoClientAioHandler();
    // 初始化
    ClientChannelContext clientChannelContext = init(serverNode, clientAioHandler);
    // 发送消息
    send(clientChannelContext, "Hello,World");
  }

  public static ClientChannelContext init(Node serverNode, ClientAioHandler clientAioHandler)
      throws IOException, Exception {

    // 事件监听器,可以为null,但是建议自己实现该接口,可以参考showcase
    ClientAioListener clientAioListener = null;

    // 断链后自动连接,不自动连接设置为null
    ReconnConf reconnConf = new ReconnConf(50000L);
    // 共用上下文对象
    ClientTioConfig clientTioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);

    // 发送消息客户端
    TioClient tioClient;
    // 客户端通道上下文,连接服务器后获得
    ClientChannelContext clientChannelContext;

    // 设置心跳时间
    clientTioConfig.setHeartbeatTimeout(0);
    // 初始化client
    tioClient = new TioClient(clientTioConfig);
    // 连接服务器
    clientChannelContext = tioClient.connect(serverNode);
    return clientChannelContext;
  }

  private static void send(ClientChannelContext clientChannelContext, String message) {
    DemoPacket helloPacket = new DemoPacket();
    helloPacket.setBody(message.getBytes());
    Tio.send(clientChannelContext, helloPacket);
  }
}

日志

测试成功后显示的日志如下

客户端日志

18:50:14.715 [tio-timer-reconnect-1] ERROR com.litongjava.tio.client.TioClient - closeds:0, connections:0
18:50:14.715 [tio-timer-heartbeat1] WARN com.litongjava.tio.client.TioClient - The user has cancelled the heartbeat sending function at the frame level, and asks the user to complete the heartbeat mechanism by himsel
18:50:14.992 [tio-group-2] INFO com.litongjava.tio.client.ConnectionCompletionHandler - connected to 127.0.0.1:9998
18:50:14.995 [tio-worker-2] INFO demo.tcp.client.DemoClientAioHandler - encode:11
18:50:14.997 [tio-group-3] DEBUG com.litongjava.tio.core.ChannelContext - server:127.0.0.1:9998, client:0:0:0:0:0:0:0:0:1311 Sent
18:50:14.998 [tio-group-4] INFO demo.tcp.client.DemoClientAioHandler - buffer:java.nio.HeapByteBuffer[pos=0 lim=51 cap=20480]
18:50:14.999 [tio-group-4] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:127.0.0.1:9998, client:0:0:0:0:0:0:0:0:1311, Unpacking to get a packet:
18:50:15.000 [tio-group-4] INFO demo.tcp.client.DemoClientAioHandler - handler
received::收到了你的消息,你的消息是:Hello,World
18:50:15.000 [tio-group-4] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:127.0.0.1:9998, client:0:0:0:0:0:0:0:0:1311,After grouping the packets, the data just ran out

服务端日志

18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - buffer:java.nio.HeapByteBuffer[pos=0 lim=11 cap=20480]
18:50:14.997 [tio-group-10] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:0.0.0.0:9998, client:127.0.0.1:1311, Unpacking to get a packet:
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - received:Hello,World
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - sendMessage:收到了你的消息,你的消息是:Hello,World
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - 开始响应
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - 响应完成
18:50:14.997 [tio-group-10] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:0.0.0.0:9998, client:127.0.0.1:1311,After grouping the packets, the data just ran out
18:50:14.998 [tio-worker-5] INFO demo.tcp.server.DemoTioServerHandler - encode:51
18:50:14.998 [tio-group-11] DEBUG com.litongjava.tio.core.ChannelContext - server:0.0.0.0:9998, client:127.0.0.1:1311 Sent
18:50:27.310 [tio-worker-6] INFO demo.tcp.server.DemoTioServerListener - 关闭后清除认证信息
18:50:27.310 [tio-worker-6] DEBUG com.litongjava.tio.core.maintain.Tokens - tcp-server, server:0.0.0.0:9998, client:127.0.0.1:1311, 并没有绑定Token
18:50:27.310 [tio-worker-6] DEBUG com.litongjava.tio.core.maintain.Users - tcp-server, server:0.0.0.0:9998, client:127.0.0.1:1311, 并没有绑定用户
18:50:27.310 [tio-worker-6] DEBUG com.litongjava.tio.core.maintain.Tokens - tcp-server, server:0.0.0.0:9998, client:127.0.0.1:1311, 并没有绑定Token

测试代码地址

https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-tcp-server-demo01