深入解析 Tio 源码:构建高性能 Java 网络应用
在现代网络应用中,处理大量并发连接和高效的数据传输是至关重要的。Tio(简称 t-io)作为一个基于 Java 的高性能网络通信框架,旨在简化这一过程。本文将基于提供的代码片段,深入解析 t-io 的源码结构和关键技术细节,帮助开发者更好地理解并应用这一框架。
目录
- Tio 框架概述
- 核心配置类:TioConfig
- 服务器配置:ServerTioConfig
- 服务器启动与监听:TioServer
- 连接接收处理:AcceptCompletionHandler
- 数据读取与解码:ReadCompletionHandler 与 DecodeTask
- 消息处理:HandlePacketTask
- 网络通信管理:Tio
- 异常处理与连接关闭:CloseTask
- 总结与技术要点
Tio 框架概述
Tio 是一个基于 Java AIO 的网络通信框架,旨在提供高性能、易用性和可扩展性。它支持 TCP、WebSocket 等协议,具备心跳机制、IP 黑名单、群组管理等功能。通过抽象化的配置和事件驱动的处理方式,Tio 简化了网络应用的开发过程。
核心配置类:TioConfig
TioConfig
是 Tio 框架的核心配置类,负责管理所有与网络通信相关的配置和状态。以下是对其关键部分的详细解析。
类结构与成员变量
public abstract class TioConfig extends MapWithLockPropSupport {
static Logger log = LoggerFactory.getLogger(TioConfig.class);
public static final Set<ServerTioConfig> ALL_SERVER_GROUPCONTEXTS = new HashSet<>();
public static final Set<ClientTioConfig> ALL_CLIENT_GROUPCONTEXTS = new HashSet<>();
public static final Set<TioConfig> ALL_GROUPCONTEXTS = new HashSet<>();
public static final int READ_BUFFER_SIZE = Integer.getInteger("tio.default.read.buffer.size", 20480);
private final static AtomicInteger ID_ATOMIC = new AtomicInteger();
private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
public boolean isShortConnection = false;
public SslConfig sslConfig = null;
public boolean debug = false;
public GroupStat groupStat = null;
public boolean statOn = true;
public PacketConverter packetConverter = null;
private CacheFactory cacheFactory;
@SuppressWarnings("rawtypes")
private RemovalListenerWrapper ipRemovalListenerWrapper;
public long startTime = SystemTimer.currTime;
public long heartbeatTimeout = 1000 * 120;
public boolean logWhenDecodeError = false;
private int readBufferSize = READ_BUFFER_SIZE;
private GroupListener groupListener = null;
private AioId tioUuid = new DefaultTAioId();
public ClientNodes clientNodes = new ClientNodes();
public SetWithLock<ChannelContext> connections = new SetWithLock<ChannelContext>(new HashSet<ChannelContext>());
public Groups groups = new Groups();
public Users users = new Users();
public Tokens tokens = new Tokens();
public Ids ids = new Ids();
public BsIds bsIds = new BsIds();
public Ips ips = new Ips();
public IpStats ipStats = new IpStats(this, null);;
protected String id;
protected int maxDecodeErrorCountForIp = 10;
protected String name = "Untitled";
private IpStatListener ipStatListener = DefaultIpStatListener.me;
private boolean isStopped = false;
public IpBlacklist ipBlacklist = null;
public MapWithLock<Integer, Packet> waitingResps = new MapWithLock<Integer, Packet>(new HashMap<Integer, Packet>());
public boolean disgnostic = EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC);
// 构造函数与初始化方法略
}
主要成员变量解析
连接管理
clientNodes
:管理所有客户端节点。connections
:当前所有的连接集合。groups
:管理群组信息。users
、tokens
、ids
、bsIds
、ips
:分别管理用户、令牌、ID、业务 ID、IP 相关的信息。
缓存与统计
cacheFactory
:用于创建缓存工厂,支持不同类型的缓存实现。ipStats
:IP 统计信息,支持多种统计时段。groupStat
:群组统计信息。heartbeatTimeout
:心跳超时时间,默认 120 秒。
配置与状态
byteOrder
:字节序,默认大端。sslConfig
:SSL 配置,用于启用加密通信。debug
:调试模式开关。statOn
:统计开关。packetConverter
:自定义包转换器。ipBlacklist
:IP 黑名单管理。
生命周期管理
startTime
:配置启动时间。isStopped
:停止标志。id
:唯一标识符,通过AtomicInteger
递增生成。name
:配置名称。
关键方法解析
初始化方法
init()
方法负责初始化配置,注册到全局上下文集合,并设置默认的 IP 移除监听器。public void init() { if (cacheFactory == null) { this.cacheFactory = ConcurrentMapCacheFactory.INSTANCE; } if (ipRemovalListenerWrapper == null) { setDefaultIpRemovalListenerWrapper(); } ALL_GROUPCONTEXTS.add(this); if (this instanceof ServerTioConfig) { ALL_SERVER_GROUPCONTEXTS.add((ServerTioConfig) this); } else { ALL_CLIENT_GROUPCONTEXTS.add((ClientTioConfig) this); } if (ALL_GROUPCONTEXTS.size() > 20) { log.warn("You have created {} TioConfig objects, you might be misusing t-io.", ALL_GROUPCONTEXTS.size()); } this.id = ID_ATOMIC.incrementAndGet() + ""; if (this.ipStats == null) { this.ipStats = new IpStats(this, null); } }
设置默认 IP 移除监听器
如果未指定 IP 移除监听器,则设置默认的监听器,用于处理 IP 被移除后的逻辑。
@SuppressWarnings({ "rawtypes", "unchecked" }) public void setDefaultIpRemovalListenerWrapper() { this.ipRemovalListenerWrapper = new RemovalListenerWrapper(); IpStatMapCacheRemovalListener ipStatMapCacheRemovalListener = new IpStatMapCacheRemovalListener(this, ipStatListener); ipRemovalListenerWrapper.setListener(ipStatMapCacheRemovalListener); }
抽象方法
TioConfig
是一个抽象类,需要子类实现以下两个方法:public abstract AioHandler getAioHandler(); public abstract AioListener getAioListener(); public abstract boolean isServer();
getAioHandler()
:获取 AIO 处理器,用于处理数据的编码和解码。getAioListener()
:获取 AIO 监听器,用于监听连接事件。isServer()
:判断是否为服务器配置。
服务器配置:ServerTioConfig
ServerTioConfig
是TioConfig
的一个具体实现,专门用于服务器端的配置管理。
类结构与成员变量
public class ServerTioConfig extends TioConfig {
static Logger log = LoggerFactory.getLogger(ServerTioConfig.class);
private ServerAioHandler serverAioHandler = null;
private ServerAioListener serverAioListener = null;
private Thread checkHeartbeatThread = null;
private boolean needCheckHeartbeat = true;
private boolean isShared = false;
// 构造函数与其他方法略
}
主要成员变量解析
AIO 处理器与监听器
serverAioHandler
:服务器端 AIO 处理器,负责数据的编码和解码。serverAioListener
:服务器端 AIO 监听器,监听连接事件和状态变化。
心跳检测
checkHeartbeatThread
:用于定时检查连接的心跳线程。needCheckHeartbeat
:是否需要进行心跳检测的标志。
共享配置
isShared
:标志配置是否被共享,影响心跳检测的逻辑。
关键方法解析
使用 SSL
useSsl
方法用于配置 SSL,加密通信数据。public void useSsl(String keyStoreFile, String trustStoreFile, String keyStorePwd) throws Exception { if (StrUtil.isNotBlank(keyStoreFile) && StrUtil.isNotBlank(trustStoreFile)) { SslConfig sslConfig = SslConfig.forServer(keyStoreFile, trustStoreFile, keyStorePwd); this.setSslConfig(sslConfig); } } public void useSsl(InputStream keyStoreInputStream, InputStream trustStoreInputStream, String passwd) throws Exception { SslConfig sslConfig = SslConfig.forServer(keyStoreInputStream, trustStoreInputStream, passwd); this.setSslConfig(sslConfig); }
初始化方法
init()
方法不仅初始化基础配置,还启动心跳检测线程。public void init() { super.init(); this.groupStat = new ServerGroupStat(); GlobalIpBlacklist.INSTANCE.init(this); Runnable check = new Runnable() { @Override public void run() { // 心跳检测逻辑 } }; checkHeartbeatThread = new Thread(check, "tio-timer-checkheartbeat-" + id + "-" + name); checkHeartbeatThread.setDaemon(true); checkHeartbeatThread.setPriority(Thread.MIN_PRIORITY); checkHeartbeatThread.start(); }
心跳检测线程
心跳检测线程定期扫描所有连接,判断是否超时未响应,进行必要的处理。
Runnable check = new Runnable() { @Override public void run() { // 初始等待 Thread.sleep(1000 * 10); while (needCheckHeartbeat && !isStopped()) { if (heartbeatTimeout <= 0) { break; } try { Thread.sleep(heartbeatTimeout); } catch (InterruptedException e1) { log.error(e1.toString(), e1); } long start = SystemTimer.currTime; SetWithLock<ChannelContext> setWithLock = connections; Set<ChannelContext> set = null; ReadLock readLock = setWithLock.readLock(); readLock.lock(); try { set = setWithLock.getObj(); for (ChannelContext channelContext : set) { long compareTime = Math.max(channelContext.stat.latestTimeOfReceivedByte, channelContext.stat.latestTimeOfSentPacket); long currtime = SystemTimer.currTime; long interval = currtime - compareTime; boolean needRemove = false; if (channelContext.heartbeatTimeout != null && channelContext.heartbeatTimeout > 0) { needRemove = interval > channelContext.heartbeatTimeout; } else { needRemove = interval > heartbeatTimeout; } if (needRemove) { if (!ServerTioConfig.this.serverAioListener.onHeartbeatTimeout(channelContext, interval, channelContext.stat.heartbeatTimeoutCount.incrementAndGet())) { log.info("{}, {} ms or not send and receive message", channelContext, interval); channelContext.setCloseCode(CloseCode.HEARTBEAT_TIMEOUT); Tio.remove(channelContext, interval + " ms not send and receive message"); } } } } catch (Throwable e) { log.error("", e); } finally { try { readLock.unlock(); if (debug) { diagnostic(start, set, start1, count); } } catch (Throwable e) { log.error("", e); } } } } };
技术细节解析
心跳机制
心跳机制通过定期检查连接的最近接收和发送时间,判断连接是否活跃。如果超过超时时间未有数据交互,则触发超时处理,包括调用监听器的
onHeartbeatTimeout
方法,并可能关闭连接。线程与并发
- 使用
ReentrantReadWriteLock
管理连接集合的并发访问,确保高效的读写操作。 - 心跳检测运行在一个独立的守护线程中,确保不会阻塞主线程。
- 使用
统计与日志
- 统计信息包括已接受连接数、当前连接数、发送和接收的数据量等。
- 在调试模式下,定期输出详细的诊断信息,帮助开发者监控系统状态。
服务器启动与监听:TioServer
TioServer
类负责启动服务器,绑定端口,监听连接,并处理接收到的连接请求。
类结构与成员变量
public class TioServer {
private static Logger log = LoggerFactory.getLogger(TioServer.class);
private ServerTioConfig serverTioConfig;
private AsynchronousServerSocketChannel serverSocketChannel;
private Node serverNode;
private boolean isWaitingStop = false;
private boolean checkLastVersion = true;
private ExecutorService groupExecutor;
private AsynchronousChannelGroup channelGroup;
// 构造函数与其他方法略
}
主要成员变量解析
配置与节点
serverTioConfig
:服务器配置对象。serverNode
:服务器节点信息,包括 IP 和端口。
网络通信
serverSocketChannel
:异步服务器套接字通道,负责接受客户端连接。channelGroup
:异步通道组,管理一组异步通道。
状态管理
isWaitingStop
:标志服务器是否正在等待停止。checkLastVersion
:版本检查开关。
线程与执行器
groupExecutor
:用于管理异步通道的线程池。
关键方法解析
启动服务器
start()
方法负责初始化服务器配置,绑定端口,启动监听,并接受连接。public void start(String serverIp, int serverPort) throws IOException { serverTioConfig.init(); serverTioConfig.getCacheFactory().register(TioCoreConfigKeys.REQEUST_PROCESSING, null, null, null); this.serverNode = new Node(serverIp, serverPort); if (EnvUtils.getBoolean("tio.core.hotswap.reload", false)) { groupExecutor = Threads.getGroupExecutor(); channelGroup = AsynchronousChannelGroup.withThreadPool(groupExecutor); serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup); } else { serverSocketChannel = AsynchronousServerSocketChannel.open(); } serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024); InetSocketAddress listenAddress = null; if (StrUtil.isBlank(serverIp)) { listenAddress = new InetSocketAddress(serverPort); } else { listenAddress = new InetSocketAddress(serverIp, serverPort); } serverSocketChannel.bind(listenAddress, 0); AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(); serverSocketChannel.accept(this, acceptCompletionHandler); serverTioConfig.startTime = System.currentTimeMillis(); Threads.getTioExecutor(); }
步骤解析:
- 初始化配置:调用
serverTioConfig.init()
,确保所有配置和状态被正确初始化。 - 绑定端口:根据提供的 IP 和端口,绑定服务器套接字通道。
- 启动监听:创建
AcceptCompletionHandler
实例,开始接受客户端连接。 - 心跳线程:初始化和启动心跳检测线程,确保连接的活跃性。
- 初始化配置:调用
停止服务器
stop()
方法负责优雅地关闭服务器,释放资源,并关闭所有连接。public boolean stop() { isWaitingStop = true; if (channelGroup != null) { try { channelGroup.shutdownNow(); } catch (Exception e) { log.error("Faild to execute channelGroup.shutdownNow()", e); } } if (groupExecutor != null) { try { groupExecutor.shutdownNow(); } catch (Exception e) { log.error("Failed to close groupExecutor", e); } } if (serverSocketChannel != null) { try { serverSocketChannel.close(); } catch (Exception e) { log.error("Failed to close serverSocketChannel", e); } } serverTioConfig.setStopped(true); boolean ret = Threads.close(); log.info(this.serverNode + " stopped"); return ret; }
步骤解析:
- 标记停止:设置
isWaitingStop
为true
,通知其他组件服务器即将停止。 - 关闭异步通道组:尝试关闭
channelGroup
,终止所有关联的异步操作。 - 关闭执行器:关闭线程池
groupExecutor
,释放线程资源。 - 关闭服务器套接字通道:关闭
serverSocketChannel
,停止接受新连接。 - 更新配置状态:设置
serverTioConfig
的停止标志,并关闭线程池。
- 标记停止:设置
版本检查
checkLastVersion
用于控制是否进行版本检查,但在提供的代码中仅记录了日志,未实际实现版本检查逻辑。public void setCheckLastVersion(boolean checkLastVersion) { log.debug("community edition is no longer supported"); }
连接接收处理:AcceptCompletionHandler
AcceptCompletionHandler
类实现了CompletionHandler
接口,负责处理新接收到的客户端连接。
类结构与成员变量
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, TioServer> {
private static Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);
public AcceptCompletionHandler() {}
@Override
public void completed(AsynchronousSocketChannel clientSocketChannel, TioServer tioServer) {
// 连接完成后的处理逻辑
}
@Override
public void failed(Throwable exc, TioServer tioServer) {
// 连接失败后的处理逻辑
}
}
主要方法解析
连接成功处理
completed()
方法在成功接收到一个客户端连接时被调用。@Override public void completed(AsynchronousSocketChannel clientSocketChannel, TioServer tioServer) { AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel(); if (tioServer.isWaitingStop()) { log.info("The server will be shut down and no new requests will be accepted:{}", tioServer.getServerNode()); } else { serverSocketChannel.accept(tioServer, this); } if (serverSocketChannel == null) { log.info("receive serverSocketChannel is null skip"); return; } if (!serverSocketChannel.isOpen()) { log.info("receive serverSocketChannel is not open skip"); return; } String clientIp = null; int port = 0; InetSocketAddress inetSocketAddress; try { inetSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress(); clientIp = inetSocketAddress.getHostString(); port = inetSocketAddress.getPort(); if (EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC, false)) { log.info("new connection:{},{}", clientIp, port); } } catch (IOException e1) { log.error("Failed to get client ip and port", e1); } ServerTioConfig serverTioConfig = tioServer.getServerTioConfig(); try { if (IpBlacklist.isInBlacklist(serverTioConfig, clientIp)) { log.info("{} on the blacklist, {}", clientIp, serverTioConfig.getName()); clientSocketChannel.close(); return; } if (serverTioConfig.statOn) { ((ServerGroupStat) serverTioConfig.groupStat).accepted.incrementAndGet(); } clientSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); clientSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024); clientSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 64 * 1024); clientSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); ServerChannelContext channelContext = new ServerChannelContext(serverTioConfig, clientSocketChannel, clientIp, port); channelContext.setClosed(false); channelContext.stat.setTimeFirstConnected(SystemTimer.currTime); channelContext.setServerNode(tioServer.getServerNode()); boolean isConnected = true; boolean isReconnect = false; if (serverTioConfig.getServerAioListener() != null) { if (!SslUtils.isSsl(channelContext.tioConfig)) { try { serverTioConfig.getServerAioListener().onAfterConnected(channelContext, isConnected, isReconnect); } catch (Throwable e) { log.error("ServerAioListener onAfterConnected:", e); } } } if (CollUtil.isNotEmpty(serverTioConfig.ipStats.durationList)) { try { for (Long v : serverTioConfig.ipStats.durationList) { IpStat ipStat = (IpStat) serverTioConfig.ipStats.get(v, channelContext); ipStat.getRequestCount().incrementAndGet(); serverTioConfig.getIpStatListener().onAfterConnected(channelContext, isConnected, isReconnect, ipStat); } } catch (Exception e) { log.error("IpStatListener onAfterConnected:", e); } } if (!tioServer.isWaitingStop()) { ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(channelContext); ByteBuffer readByteBuffer = ByteBufferPool.BUFFER_POOL.acquire(serverTioConfig.getByteOrder()); readByteBuffer.position(0); readByteBuffer.limit(readByteBuffer.capacity()); clientSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler); } } catch (Throwable e) { log.error("Failed to read data from :{},{}", clientIp, port); e.printStackTrace(); } }
步骤解析:
- 重复接受连接:在每次成功接受连接后,立即调用
serverSocketChannel.accept()
继续接受下一个连接。 - 获取客户端 IP 和端口:通过
clientSocketChannel.getRemoteAddress()
获取连接的客户端信息。 - IP 黑名单检查:如果客户端 IP 在黑名单中,则拒绝连接。
- 配置 Socket 选项:设置
SO_REUSEADDR
、接收和发送缓冲区大小、SO_KEEPALIVE
等选项,优化连接性能。 - 创建 ChannelContext:为新连接创建
ChannelContext
对象,管理连接的状态和统计信息。 - 触发连接事件:调用
serverAioListener.onAfterConnected()
方法,通知监听器连接已建立。 - 心跳统计:更新 IP 统计信息。
- 开始读取数据:为新连接创建
ReadCompletionHandler
,并开始异步读取数据。
- 重复接受连接:在每次成功接受连接后,立即调用
连接失败处理
failed()
方法在接受连接失败时被调用。@Override public void failed(Throwable exc, TioServer tioServer) { if (tioServer.isWaitingStop()) { log.info("The server will be shut down and no new requests will be accepted:{}", tioServer.getServerNode()); } else { AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel(); serverSocketChannel.accept(tioServer, this); log.error("[" + tioServer.getServerNode() + "] listening exception", exc); } }
步骤解析:
- 停止标志检查:如果服务器正在等待停止,则不再接受新连接。
- 继续接受连接:如果未停止,继续调用
accept()
方法,保持服务器的可用性。 - 日志记录:记录连接失败的异常信息,便于调试和监控。
技术细节解析
异步 NIO
使用 Java AIO 的
AsynchronousServerSocketChannel
和AsynchronousSocketChannel
实现非阻塞的连接接受和数据传输,提升系统的并发处理能力。IP 黑名单
在接收到新连接时,首先检查客户端 IP 是否在黑名单中。如果是,则拒绝连接,增强系统的安全性。
Socket 选项优化
SO_REUSEADDR
:允许重新使用本地地址,避免在服务器重启后因端口占用而无法绑定。SO_RCVBUF
和SO_SNDBUF
:设置接收和发送缓冲区大小,优化数据传输性能。SO_KEEPALIVE
:启用 TCP 保活,检测连接的有效性。
连接上下文管理
为每个连接创建独立的
ChannelContext
,负责管理连接的状态、统计信息和配置,确保高效的连接管理。
数据读取与解码:ReadCompletionHandler 与 DecodeTask
数据读取和解码是网络通信的关键环节,涉及数据的接收、解码、处理等多个步骤。Tio 通过ReadCompletionHandler
和DecodeTask
实现这一过程。
ReadCompletionHandler
ReadCompletionHandler
类实现了CompletionHandler
接口,负责处理异步读取操作完成后的回调。
类结构与成员变量
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private static Logger log = LoggerFactory.getLogger(ReadCompletionHandler.class);
private ChannelContext channelContext = null;
public ReadCompletionHandler(ChannelContext channelContext) {
this.channelContext = channelContext;
}
@Override
public void completed(Integer result, ByteBuffer byteBuffer) {
// 处理读取完成后的逻辑
}
@Override
public void failed(Throwable exc, ByteBuffer byteBuffer) {
// 处理读取失败后的逻辑
}
}
关键方法解析
读取完成处理
completed()
方法在数据读取完成时被调用,负责处理接收到的数据。@Override public void completed(Integer result, ByteBuffer byteBuffer) { if (result > 0) { TioConfig tioConfig = channelContext.tioConfig; if (tioConfig.statOn) { tioConfig.groupStat.receivedBytes.addAndGet(result); tioConfig.groupStat.receivedTcps.incrementAndGet(); channelContext.stat.receivedBytes.addAndGet(result); channelContext.stat.receivedTcps.incrementAndGet(); } channelContext.stat.latestTimeOfReceivedByte = SystemTimer.currTime; if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) { try { for (Long v : tioConfig.ipStats.durationList) { IpStat ipStat = tioConfig.ipStats.get(v, channelContext); ipStat.getReceivedBytes().addAndGet(result); ipStat.getReceivedTcps().incrementAndGet(); tioConfig.getIpStatListener().onAfterReceivedBytes(channelContext, result, ipStat); } } catch (Exception e1) { log.error(channelContext.toString(), e1); } } if (tioConfig.getAioListener() != null) { try { tioConfig.getAioListener().onAfterReceivedBytes(channelContext, result); } catch (Exception e) { log.error(channelContext.toString(), e); } } byteBuffer.flip(); if (channelContext.sslFacadeContext == null) { new DecodeTask().decode(channelContext, byteBuffer); } else { ByteBuffer copiedByteBuffer = null; try { copiedByteBuffer = ByteBufferUtils.copy(byteBuffer); log.debug("{},Decrypt SSL data:{}", channelContext, copiedByteBuffer); channelContext.sslFacadeContext.getSslFacade().decrypt(copiedByteBuffer); } catch (Exception e) { log.error(channelContext + ", " + e.toString() + copiedByteBuffer, e); Tio.close(channelContext, e, e.toString(), CloseCode.SSL_DECRYPT_ERROR); } } if (TioUtils.checkBeforeIO(channelContext)) { read(byteBuffer); } else { ByteBufferPool.BUFFER_POOL.release(byteBuffer); } } else if (result == 0) { String message = "The length of the read data is 0"; log.error("close {}, because {}", channelContext, message); Tio.close(channelContext, null, message, CloseCode.READ_COUNT_IS_ZERO); ByteBufferPool.BUFFER_POOL.release(byteBuffer); return; } else if (result < 0) { if (result == -1) { String message = "The connection closed by peer"; if (EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC, false)) { log.info("close {}, because {}", channelContext.getClientIpAndPort(), message); } Tio.close(channelContext, null, message, CloseCode.CLOSED_BY_PEER); ByteBufferPool.BUFFER_POOL.release(byteBuffer); return; } else { String message = "The length of the read data is less than -1"; Tio.close(channelContext, null, "读数据时返回" + result, CloseCode.READ_COUNT_IS_NEGATIVE); log.error("close {}, because {}", channelContext, message); ByteBufferPool.BUFFER_POOL.release(byteBuffer); return; } } }
步骤解析:
数据有效性检查:根据
result
判断读取的数据长度,处理不同情况。result > 0
:成功读取到数据,进行统计和解码处理。result == 0
:读取到的数据长度为 0,关闭连接。result < 0
:读取错误,可能是对端关闭连接,进行相应处理。
统计信息更新:根据读取的数据长度,更新全局和连接级别的统计信息。
SSL 解密:如果连接启用了 SSL,则对接收到的数据进行解密处理。
数据解码:通过
DecodeTask
进行数据的解码,转换为业务包。继续读取数据:调用
read()
方法,继续异步读取数据,保持连接的持续活跃。
读取失败处理
failed()
方法在数据读取失败时被调用,负责关闭连接。@Override public void failed(Throwable exc, ByteBuffer byteBuffer) { Tio.close(channelContext, exc, "Failed to read data: " + exc.getClass().getName(), CloseCode.READ_ERROR); }
步骤解析:
- 关闭连接:由于读取失败,调用
Tio.close()
方法关闭连接,释放资源。 - 日志记录:记录失败的异常信息,便于问题排查。
- 关闭连接:由于读取失败,调用
技术细节解析
异步读取
使用 Java AIO 的
AsynchronousSocketChannel
进行非阻塞的数据读取,提升系统的并发处理能力。心跳机制
通过更新
latestTimeOfReceivedByte
和latestTimeOfSentPacket
,监控连接的活跃性,结合心跳检测线程,确保连接的有效性。SSL 支持
- 解密处理:在数据读取后,若启用了 SSL,则对接收到的数据进行解密,确保数据的安全性。
- 错误处理:如果解密过程中出现异常,则关闭连接,防止潜在的安全风险。
缓冲区管理
- 使用
ByteBufferPool
管理缓冲区,减少内存分配和回收的开销,提升性能。 - 通过
ByteBuffer.flip()
准备数据进行读取和解码。
- 使用
消息处理:HandlePacketTask
HandlePacketTask
类负责将解码后的业务包交给业务逻辑处理器,进行具体的业务处理。
类结构与成员变量
public class HandlePacketTask {
private AtomicLong synFailCount = new AtomicLong();
public void handler(ChannelContext channelContext, Packet packet) {
// 处理业务包的逻辑
}
}
关键方法解析
业务包处理
handler()
方法负责将业务包传递给 AIO 处理器,并更新统计信息。public void handler(ChannelContext channelContext, Packet packet) { TioConfig tioConfig = channelContext.tioConfig; if (packet.isKeepConnection() && !channelContext.isBind) { tioConfig.ips.bind(channelContext); tioConfig.ids.bind(channelContext); channelContext.groups = new SetWithLock<String>(new HashSet<>()); channelContext.isBind = true; } long start = SystemTimer.currTime; try { Integer synSeq = packet.getSynSeq(); if (synSeq != null && synSeq > 0) { MapWithLock<Integer, Packet> syns = tioConfig.getWaitingResps(); Packet initPacket = syns.remove(synSeq); if (initPacket != null) { synchronized (initPacket) { syns.put(synSeq, packet); initPacket.notify(); } } else { log.error("[{}] Failed to synchronize message, synSeq is {}, but there is no corresponding key value in the synchronization collection", synFailCount.incrementAndGet(), synSeq); } } else { if (EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC, false)) { Long id = packet.getId(); String requestInfo = channelContext.getClientIpAndPort() + "_" + id; log.info("handle:{}", requestInfo); } tioConfig.getAioHandler().handler(packet, channelContext); } } catch (Throwable e) { e.printStackTrace(); } finally { long end = SystemTimer.currTime; long iv = end - start; if (tioConfig.statOn) { channelContext.stat.handledPackets.incrementAndGet(); channelContext.stat.handledBytes.addAndGet(packet.getByteCount()); channelContext.stat.handledPacketCosts.addAndGet(iv); tioConfig.groupStat.handledPackets.incrementAndGet(); tioConfig.groupStat.handledBytes.addAndGet(packet.getByteCount()); tioConfig.groupStat.handledPacketCosts.addAndGet(iv); } if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) { try { for (Long v : tioConfig.ipStats.durationList) { IpStat ipStat = (IpStat) tioConfig.ipStats.get(v, channelContext); ipStat.getHandledPackets().incrementAndGet(); ipStat.getHandledBytes().addAndGet(packet.getByteCount()); ipStat.getHandledPacketCosts().addAndGet(iv); tioConfig.getIpStatListener().onAfterHandled(channelContext, packet, ipStat, iv); } } catch (Exception e1) { e1.printStackTrace(); } } if (tioConfig.getAioListener() != null) { try { tioConfig.getAioListener().onAfterHandled(channelContext, packet, iv); } catch (Exception e) { e.printStackTrace(); } } } }
步骤解析:
连接绑定:如果业务包要求保持连接(
isKeepConnection
),且连接尚未绑定,则将连接绑定到 IP、ID 等。同步请求处理:
- 如果业务包带有同步序列号(
synSeq
),则从等待响应的集合中查找对应的请求包,并通知等待的线程。 - 如果未找到对应的请求包,则记录同步失败计数,并记录错误日志。
- 如果业务包带有同步序列号(
异步请求处理:
- 如果业务包没有同步序列号,则直接调用 AIO 处理器的
handler()
方法,进行业务处理。
- 如果业务包没有同步序列号,则直接调用 AIO 处理器的
统计信息更新:记录处理包的数量、字节数和处理时间。
回调监听器:通知 AIO 监听器业务包已处理完毕。
技术细节解析
连接绑定
对于需要保持连接的业务包,通过绑定 IP 和 ID,确保连接的状态与业务逻辑的一致性。
同步与异步请求处理
- 同步请求:通过序列号匹配请求与响应,实现同步通信的效果。使用
CountDownLatch
等待响应。 - 异步请求:直接将业务包交给 AIO 处理器,无需等待,适用于高并发场景。
- 同步请求:通过序列号匹配请求与响应,实现同步通信的效果。使用
统计与日志
- 通过
AtomicLong
和SetWithLock
管理统计信息,确保高效的并发处理。 - 在调试模式下,记录详细的处理日志,便于问题排查。
- 通过
异常处理
- 捕获并记录业务处理过程中的异常,确保系统的稳定性。
- 对于同步请求未能找到对应的请求包,记录同步失败计数,预防潜在的同步问题。
网络通信管理:Tio
Tio
类作为 Tio 框架的核心 API,提供了丰富的方法用于管理连接、发送消息、关闭连接等操作。以下将详细解析其关键功能和技术实现。
类结构与成员变量
public class Tio {
public static class IpBlacklist {
// IP黑名单相关方法
}
private static Logger log = LoggerFactory.getLogger(Tio.class);
// 其他内部类和方法略
private Tio() {}
}
主要功能解析
IP 黑名单管理
IpBlacklist
内部类提供了对 IP 黑名单的管理功能,包括添加、删除、检查等。public static class IpBlacklist { public static boolean add(TioConfig tioConfig, String ip) { return tioConfig.ipBlacklist.add(ip); } public static boolean add(String ip) { return GlobalIpBlacklist.INSTANCE.global.add(ip); } public static void clear(TioConfig tioConfig) { tioConfig.ipBlacklist.clear(); } public static void clear() { GlobalIpBlacklist.INSTANCE.global.clear(); } public static Collection<String> getAll(TioConfig tioConfig) { return tioConfig.ipBlacklist.getAll(); } public static Collection<String> getAll() { return GlobalIpBlacklist.INSTANCE.global.getAll(); } public static boolean isInBlacklist(TioConfig tioConfig, String ip) { if (tioConfig.ipBlacklist != null) { return tioConfig.ipBlacklist.isInBlacklist(ip) || GlobalIpBlacklist.INSTANCE.global.isInBlacklist(ip); } else { return GlobalIpBlacklist.INSTANCE.global.isInBlacklist(ip); } } public static void remove(TioConfig tioConfig, String ip) { tioConfig.ipBlacklist.remove(ip); } public static void remove(String ip) { GlobalIpBlacklist.INSTANCE.global.remove(ip); } }
技术细节解析:
- 全局与局部黑名单:支持针对特定
TioConfig
的局部黑名单和全局黑名单,增强灵活性。 - 并发访问:通过
SetWithLock
和其他并发集合管理黑名单,确保线程安全。
- 全局与局部黑名单:支持针对特定
发送消息
Tio
类提供了多种发送消息的方法,包括发送到单个连接、群组、指定 IP 等。public static Boolean send(ChannelContext channelContext, Packet packet) { return send(channelContext, packet, null, null); } private static Boolean send(ChannelContext channelContext, Packet packet, CountDownLatch countDownLatch, PacketSendMode packetSendMode) { // 发送逻辑 } public static Boolean bSend(ChannelContext channelContext, Packet packet) { if (channelContext == null) { return false; } CountDownLatch countDownLatch = new CountDownLatch(1); return send(channelContext, packet, countDownLatch, PacketSendMode.SINGLE_BLOCK); } // 其他发送方法略
关键方法解析:
send()
:核心发送方法,负责将消息包传递给SendPacketTask
进行发送,并处理同步发送的逻辑。bSend()
:阻塞发送方法,适用于需要同步等待发送结果的场景。通过CountDownLatch
实现同步等待。群组与指定 IP 发送:通过遍历目标群组或 IP 对应的连接集合,批量发送消息,提高系统的广播能力。
关闭连接
Tio
提供了多种关闭连接的方法,包括关闭单个连接、群组、指定 IP 等。public static void close(ChannelContext channelContext, String remark) { close(channelContext, null, remark); } public static void close(ChannelContext channelContext, Throwable throwable, String remark) { close(channelContext, throwable, remark, false); } // 其他关闭方法略
关键方法解析:
close()
:核心关闭方法,负责关闭连接并执行必要的清理操作。remove()
:与close()
方法类似,但不执行重连等维护操作,适用于需要立即移除连接的场景。
群组管理
Tio
提供了方法用于绑定和解绑群组,管理连接的群组关系。public static void bindGroup(ChannelContext channelContext, String group) { channelContext.tioConfig.groups.bind(group, channelContext); } public static void unbindGroup(String group, ChannelContext channelContext) { channelContext.tioConfig.groups.unbind(group, channelContext); } // 其他群组管理方法略
技术细节解析:
高效的群组管理:通过
SetWithLock
和并发集合管理群组内的连接,确保高效的群组操作。灵活的群组操作:支持动态的群组绑定与解绑,适应复杂的业务需求。
技术细节解析
并发与锁机制
- 使用
ReentrantLock
和Condition
管理异步操作的同步,确保多线程环境下的操作安全。 SetWithLock
和MapWithLock
封装了线程安全的集合操作,简化了并发编程。
- 使用
心跳与连接检测
- 通过心跳检测机制,定期检查连接的活跃性,及时关闭失效连接,提升系统的稳定性。
- 使用
AtomicLong
和原子变量管理统计信息,确保高效的并发更新。
SSL 支持
- 通过
SslConfig
和SslFacadeContext
实现 SSL 加密,确保数据传输的安全性。 - 在发送和接收数据时,自动进行加密与解密,简化开发者的操作。
- 通过
缓冲区管理
- 使用
ByteBufferPool
管理缓冲区,减少内存分配与回收的开销,提升系统性能。 - 通过
ByteBufferUtils
提供的工具方法,简化缓冲区操作。
- 使用
错误处理与日志
- 通过
CompletionHandler
接口的failed()
方法统一处理读取与写入过程中的异常,确保系统的健壮性。 - 在不同的错误场景下,记录详细的日志信息,便于开发者排查问题。
- 通过
异常处理与连接关闭:CloseTask
CloseTask
类负责管理连接的关闭过程,包括释放资源、更新状态、处理重连等。
类结构与成员变量
public class CloseTask {
public static void close(ChannelContext channelContext) {
// 关闭连接的逻辑
}
}
关键方法解析
关闭连接
close()
方法负责关闭连接,并根据配置决定是否进行重连或清理操作。public static void close(ChannelContext channelContext) { boolean isNeedRemove = channelContext.closeMeta.isNeedRemove; String remark = channelContext.closeMeta.remark; Throwable throwable = channelContext.closeMeta.throwable; channelContext.stat.timeClosed = SystemTimer.currTime; try { if (channelContext.tioConfig.getAioListener() != null) { try { channelContext.tioConfig.getAioListener().onBeforeClose(channelContext, throwable, remark, isNeedRemove); } catch (Throwable e) { log.error(e.toString(), e); } } try { if (channelContext.isClosed && !isNeedRemove) { return; } if (channelContext.isRemoved) { return; } try { if (isNeedRemove) { MaintainUtils.remove(channelContext); } else { ClientTioConfig clientTioConfig = (ClientTioConfig) channelContext.tioConfig; clientTioConfig.closeds.add(channelContext); clientTioConfig.connecteds.remove(channelContext); MaintainUtils.close(channelContext); } channelContext.setRemoved(isNeedRemove); if (channelContext.tioConfig.statOn) { channelContext.tioConfig.groupStat.closed.incrementAndGet(); } channelContext.stat.timeClosed = SystemTimer.currTime; channelContext.setClosed(true); } catch (Throwable e) { log.error(e.toString(), e); } finally { if (!isNeedRemove && channelContext.isClosed && !channelContext.isServer()) { ClientChannelContext clientChannelContext = (ClientChannelContext) channelContext; ReconnConf.put(clientChannelContext); } } } catch (Throwable e) { log.error(throwable.toString(), e); } } finally { channelContext.isWaitingClose = false; } }
步骤解析:
状态检查:判断连接是否已经关闭或被移除,避免重复操作。
调用监听器:如果配置了 AIO 监听器,调用
onBeforeClose()
方法,通知监听器连接即将关闭。资源释放:
- 移除连接:如果需要移除,则从群组、用户等管理集合中移除连接。
- 更新统计:更新全局和连接级别的关闭统计信息。
- 设置状态:标记连接为已关闭和已移除。
重连处理:对于客户端配置,如果连接被关闭且不需要移除,则将连接加入重连队列,尝试重新连接。
异常处理:捕获并记录关闭过程中的异常,确保系统稳定性。
技术细节解析
资源管理
在关闭连接时,确保所有相关资源(如缓冲区、引用等)被正确释放,防止内存泄漏和资源浪费。
状态同步
通过设置标志位(如
isClosed
、isRemoved
)确保连接状态的一致性,避免并发环境下的状态混乱。重连机制
对于客户端连接,在连接被关闭后,根据重连配置决定是否尝试重新连接,增强系统的容错性和可用性。
监听器回调
提供钩子机制,通过 AIO 监听器的回调方法,让开发者在连接关闭前执行自定义逻辑,如资源清理、日志记录等。
多线程支持
在多线程环境下,通过使用
ReentrantLock
和Condition
,确保关闭操作的线程安全性。
示例运行与日志分析
基于用户提供的日志,以下是一个请求的处理流程解析。
测试请求
curl http://localhost:8000/ok
处理显示的日志
2024-12-06 18:23:40.134 [Thread-2] INFO c.l.t.s.AcceptCompletionHandler.completed:68 - new connection:127.0.0.1,42972
2024-12-06 18:23:40.134 [Thread-2] INFO c.l.t.c.t.DecodeTask.decode:41 - decode:127.0.0.1:42972
2024-12-06 18:23:40.134 [Thread-2] INFO c.l.t.c.t.HandlePacketTask.handler:62 - handle:127.0.0.1:42972_281494
2024-12-06 18:23:40.135 [Thread-1] INFO c.l.t.c.ReadCompletionHandler.completed:104 - close 127.0.0.1:42972, because The connection closed by peer
2024-12-06 18:23:40.135 [Thread-1512] INFO c.l.t.c.WriteCompletionHandler.completed:57 - write 172 to 127.0.0.1:42972
2024-12-06 18:23:40.135 [Thread-1] INFO c.l.t.c.Tio.close:455 - close 127.0.0.1:42972,remark:The connection closed by peer
日志解析
连接建立
2024-12-06 18:23:40.134 [Thread-2] INFO c.l.t.s.AcceptCompletionHandler.completed:68 - new connection:127.0.0.1,42972
- 事件:新连接建立。
- 信息:客户端 IP 为
127.0.0.1
,端口为42972
。 - 处理类:
AcceptCompletionHandler
的completed()
方法。
数据解码
2024-12-06 18:23:40.134 [Thread-2] INFO c.l.t.c.t.DecodeTask.decode:41 - decode:127.0.0.1:42972
- 事件:接收到的数据开始解码。
- 信息:来自
127.0.0.1:42972
的数据正在解码。 - 处理类:
DecodeTask
的decode()
方法。
业务包处理
2024-12-06 18:23:40.134 [Thread-2] INFO c.l.t.c.t.HandlePacketTask.handler:62 - handle:127.0.0.1:42972_281494
- 事件:业务包开始处理。
- 信息:处理来自
127.0.0.1:42972
的业务包,包 ID 为281494
。 - 处理类:
HandlePacketTask
的handler()
方法。
读取完成,连接关闭
2024-12-06 18:23:40.135 [Thread-1] INFO c.l.t.c.ReadCompletionHandler.completed:104 - close 127.0.0.1:42972, because The connection closed by peer
- 事件:连接读取完成,且对端关闭连接。
- 信息:因为对端关闭连接,
127.0.0.1:42972
的连接被关闭。 - 处理类:
ReadCompletionHandler
的completed()
方法。
写入完成
2024-12-06 18:23:40.135 [Thread-1512] INFO c.l.t.c.WriteCompletionHandler.completed:57 - write 172 to 127.0.0.1:42972
- 事件:数据写入完成。
- 信息:写入
172
字节到127.0.0.1:42972
。 - 处理类:
WriteCompletionHandler
的completed()
方法。
连接关闭处理
2024-12-06 18:23:40.135 [Thread-1] INFO c.l.t.c.Tio.close:455 - close 127.0.0.1:42972,remark:The connection closed by peer
- 事件:连接正式关闭。
- 信息:关闭
127.0.0.1:42972
的连接,备注信息为The connection closed by peer
。 - 处理类:
Tio
类的close()
方法。
技术细节解析
连接生命周期
- 建立:通过
AcceptCompletionHandler
接受新连接,创建ChannelContext
,并开始异步读取数据。 - 数据处理:读取到数据后,通过
ReadCompletionHandler
进行解码,并交由HandlePacketTask
处理业务逻辑。 - 连接关闭:对端关闭连接后,
ReadCompletionHandler
触发关闭流程,调用Tio.close()
进行连接的清理和资源释放。
- 建立:通过
异步操作与回调
- 非阻塞通信:使用 Java AIO 的异步通道,实现高效的非阻塞网络通信。
- 回调机制:通过实现
CompletionHandler
接口,处理异步操作完成后的回调,实现事件驱动的通信模型。
错误处理
- 读取异常:在读取过程中,如果出现异常,调用
Tio.close()
关闭连接,确保系统的稳定性。 - 写入异常:在写入过程中,如果出现异常,同样调用
Tio.close()
关闭连接。
- 读取异常:在读取过程中,如果出现异常,调用
统计与监控
- 连接统计:统计已接收连接数、当前连接数、发送和接收的字节数等,提供系统的运行状态监控。
- 心跳检测:通过心跳机制,检测连接的活跃性,及时关闭失效连接。
总结与技术要点
本文基于提供的代码片段,详细解析了 Tio 框架的核心配置、服务器启动与监听、连接接收处理、数据读取与解码、消息处理、网络通信管理、异常处理与连接关闭等关键组件和功能。通过深入理解这些组件的结构和交互方式,开发者可以更好地应用 Tio 框架,构建高性能、稳定的网络应用。
关键技术要点
- 高效的异步通信:基于 Java AIO 的异步通道,实现高并发下的高效网络通信。
- 灵活的配置管理:通过
TioConfig
和ServerTioConfig
,提供丰富的配置选项,满足不同场景需求。 - 心跳与连接检测:通过心跳机制和统计信息,确保连接的活跃性和系统的稳定性。
- IP 黑名单与安全:支持 IP 黑名单功能,增强系统的安全性,防止恶意连接。
- 消息编码与解码:通过 AIO 处理器和解码任务,灵活处理不同协议和业务逻辑的数据包。
- 群组与用户管理:提供群组、用户、令牌等多种连接管理方式,适应复杂的业务需求。
- 异常处理与日志记录:统一的异常处理机制和详细的日志记录,帮助开发者快速定位和解决问题。
- 资源管理与性能优化:通过缓冲区池和并发集合,优化资源管理和系统性能,减少内存开销。
通过深入理解 Tio 框架的这些关键技术要点,开发者可以充分发挥其优势,构建高性能、稳定、安全的网络应用。