WebSocket 服务器
tio-boot
内置了 tio-websocket
组件,推荐使用 tio-boot
创建 WebSocket 服务器。如果您对 WebSocket 场景有特殊的要求,可以参考下面的代码启动一个独立的 WebSocket 服务器。
添加依赖
在项目的 pom.xml
中添加以下依赖:
<dependencies>
<!-- tio-websocket-server:创建 WebSocket 服务器的主要依赖 -->
<dependency>
<groupId>com.litongjava</groupId>
<artifactId>tio-websocket-server</artifactId>
<version>3.7.3.v20241005-RELEASE</version>
</dependency>
<!-- logback-classic:用于日志记录 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- Lombok:用于简化 Java 代码 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<optional>true</optional>
<scope>provided</scope>
</dependency>
</dependencies>
定义常量
创建一个 Const
类,用于存储服务器中使用的常量:
package com.litongjava;
import com.litongjava.model.time.Time;
public class Const {
/**
* 群聊的组 ID
*/
public static final String GROUP_ID = "g_001";
/**
* 应用名称(可自行命名,主要用于开发者辨识)
*/
public static final String APP_NAME = "websocket-server";
/**
* 字符编码
*/
public static final String CHARSET = "utf-8";
/**
* 监听的 IP(为 null 表示监听所有 IP)
*/
public static final String SERVER_IP = null;
/**
* 监听的端口
*/
public static final int SERVER_PORT = 9326;
/**
* 心跳超时时间,单位:毫秒(0 表示不超时)
*/
public static final int HEARTBEAT_TIMEOUT = 0;
/**
* IP 数据监控统计的时间段
*/
public static interface IpStatDuration {
public static final Long DURATION_1 = Time.MINUTE_1 * 5;
public static final Long[] IPSTAT_DURATIONS = new Long[] { DURATION_1 };
}
}
实现 IP 统计监听器
创建一个 MyIpStatListener
类,实现 IpStatListener
接口,用于处理 IP 统计相关的事件:
package com.litongjava;
import com.litongjava.aio.Packet;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.stat.IpStat;
import com.litongjava.tio.core.stat.IpStatListener;
public class MyIpStatListener implements IpStatListener {
@Override
public void onExpired(TioConfig tioConfig, IpStat ipStat) {
// 处理 IP 统计过期事件
}
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect, IpStat ipStat) throws Exception {
// 处理连接后事件
}
@Override
public void onDecodeError(ChannelContext channelContext, IpStat ipStat) {
// 处理解码错误事件
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess, IpStat ipStat) throws Exception {
// 处理数据发送后事件
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize, IpStat ipStat) throws Exception {
// 处理数据解码后事件
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes, IpStat ipStat) throws Exception {
// 处理接收字节后事件
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, IpStat ipStat, long cost) throws Exception {
// 处理数据处理后事件
}
}
提示:目前这些方法都是空实现,您可以根据需求在其中添加具体的逻辑。
实现服务器监听器
创建一个 MyServerAioListener
类,继承自 WebSocketServerAioListener
,用于处理服务器的各种事件,如连接、消息发送和客户端断开等:
package com.litongjava;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.litongjava.aio.Packet;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import com.litongjava.tio.websocket.common.WebSocketSessionContext;
import com.litongjava.tio.websocket.server.WebSocketServerAioListener;
/**
* 根据需要实现该类
*/
public class MyServerAioListener extends WebSocketServerAioListener {
private static Logger log = LoggerFactory.getLogger(MyServerAioListener.class);
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
super.onAfterConnected(channelContext, isConnected, isReconnect);
if (log.isInfoEnabled()) {
log.info("onAfterConnected:{}", channelContext);
}
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
super.onAfterSent(channelContext, packet, isSentSuccess);
if (log.isInfoEnabled()) {
log.info("onAfterSent:{},{}", packet.logstr(), channelContext);
}
}
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
super.onBeforeClose(channelContext, throwable, remark, isRemove);
if (log.isInfoEnabled()) {
log.info("onBeforeClose:{}", channelContext);
}
WebSocketSessionContext wsSessionContext = (WebSocketSessionContext) channelContext.get();
if (wsSessionContext != null && wsSessionContext.isHandshaked()) {
int count = Tio.getAll(channelContext.tioConfig).getObj().size();
String msg = channelContext.getClientNode().toString() + " 离开了,现在共有【" + count + "】人在线";
// 创建文本消息
WebSocketResponse wsResponse = WebSocketResponse.fromText(msg, Const.CHARSET);
// 向组内所有客户端发送消息
Tio.sendToGroup(channelContext.tioConfig, Const.GROUP_ID, wsResponse);
}
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
super.onAfterDecoded(channelContext, packet, packetSize);
if (log.isInfoEnabled()) {
log.info("onAfterDecoded:{},{}", packet.logstr(), channelContext);
}
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
super.onAfterReceivedBytes(channelContext, receivedBytes);
if (log.isInfoEnabled()) {
log.info("onAfterReceivedBytes:{}", channelContext);
}
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
super.onAfterHandled(channelContext, packet, cost);
if (log.isInfoEnabled()) {
log.info("onAfterHandled:{},{}", packet.logstr(), channelContext);
}
}
}
在 onBeforeClose
方法中,当客户端断开连接时,服务器会向组内所有客户端发送一条消息,通知他们有客户端离开。
实现 WebSocket 消息处理器
创建一个 HelloWebSocketHandler
类,实现 IWsMsgHandler
接口,用于处理 WebSocket 消息:
package com.litongjava;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.websocket.common.WsRequest;
import com.litongjava.tio.websocket.common.WsResponse;
import com.litongjava.tio.websocket.server.handler.IWsMsgHandler;
public class HelloWebSocketHandler implements IWsMsgHandler {
@Override
public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
// 处理文本消息,这里简单地将收到的消息原样返回
WsResponse wsResponse = WsResponse.fromText(text, Const.CHARSET);
return wsResponse;
}
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 处理连接关闭事件
return null;
}
@Override
public Object onBinary(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 处理二进制消息
return null;
}
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 处理字节消息
return null;
}
@Override
public Object onPing(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 处理 ping 消息
return null;
}
@Override
public Object onPong(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 处理 pong 消息
return null;
}
}
说明:在这个处理器中,您可以根据需求实现对不同类型 WebSocket 消息的处理逻辑。
启动 WebSocket 服务器
创建一个 WebsocketServerDemo
类,包含 main
方法,用于启动服务器:
package com.litongjava;
import java.io.IOException;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.websocket.server.WsServerStarter;
public class WebsocketServerDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 创建 WebSocket 消息处理器实例
HelloWebSocketHandler helloWebSocketHandler = new HelloWebSocketHandler();
WsServerStarter wsServerStarter = null;
try {
// 初始化 WebSocket 服务器启动器
wsServerStarter = new WsServerStarter(Const.SERVER_PORT, helloWebSocketHandler);
} catch (IOException e) {
e.printStackTrace();
return;
}
ServerTioConfig serverTioConfig = wsServerStarter.getServerTioConfig();
serverTioConfig.setName(Const.APP_NAME);
serverTioConfig.setServerAioListener(new MyServerAioListener());
// 设置 IP 统计监听器
serverTioConfig.setIpStatListener(new MyIpStatListener());
// 设置心跳超时时间
serverTioConfig.setHeartbeatTimeout(Const.HEARTBEAT_TIMEOUT);
// 如果您希望通过 wss 访问,请配置 SSL(需提供与域名匹配的 SSL 证书)
/*
String keyStoreFile = "classpath:config/ssl/keystore.jks";
String trustStoreFile = "classpath:config/ssl/keystore.jks";
String keyStorePwd = "您的密钥库密码";
serverTioConfig.useSsl(keyStoreFile, trustStoreFile, keyStorePwd);
*/
// 初始化服务器配置
serverTioConfig.init();
// 设置 IP 统计的时间段
serverTioConfig.ipStats.addDurations(Const.IpStatDuration.IPSTAT_DURATIONS);
try {
// 启动服务器
wsServerStarter.start();
} catch (IOException e) {
e.printStackTrace();
return;
}
long end = System.currentTimeMillis();
System.out.println("服务器启动耗时:" + (end - start) + " 毫秒");
}
}
测试服务器
配置完成后,您可以使用 WebSocket 客户端连接到服务器,并发送消息进行测试。确保客户端连接到 ws://<服务器IP>:9326
,并观察服务器的日志输出和客户端的消息接收情况。