WebSocket Client

WebSocket Client 并不是 tio-boot 的内置组件

1. 依赖添加

首先,您需要在项目的pom.xml文件中添加以下依赖,以使用tio-websocket-client库。

<dependency>
  <groupId>com.litongjava</groupId>
  <artifactId>tio-websocket-client</artifactId>
  <version>3.7.3.v20241004-RELEASE</version>
</dependency>

这个依赖是 WebSocket 客户端的核心,提供了建立连接、发送和接收消息等功能。

2. WebSocket 客户端测试

主要组件和流程:

  1. 消息发送跟踪: 使用ConcurrentHashMap来存储和跟踪每条消息的发送状态。
  2. 消息确认机制: 使用 RxJava 的SubjectPublishSubject来处理消息确认。当所有消息都确认发送后,会打印出“All sent success!”。
  3. WebSocket 客户端配置:
    • onOpen:连接打开时的回调。
    • onMessage:接收到消息时的回调。更新消息状态,并打印接收到的消息。
    • onClose:连接关闭时的回调。
    • onError:出现错误时的回调。
    • onThrows:异常处理。
  4. 连接建立: 使用WsClient.create创建 WebSocket 客户端,并通过connect方法建立连接。
  5. 消息发送: 循环发送一定数量的消息,并打印发送状态。

普通写法

package demo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

import com.litongjava.tio.websocket.client.WebSocket;
import com.litongjava.tio.websocket.client.WebsocketClient;
import com.litongjava.tio.websocket.client.config.WebsocketClientConfig;
import com.litongjava.tio.websocket.client.event.CloseEvent;
import com.litongjava.tio.websocket.client.event.ErrorEvent;
import com.litongjava.tio.websocket.client.event.MessageEvent;
import com.litongjava.tio.websocket.client.event.OpenEvent;
import com.litongjava.tio.websocket.common.WebSocketPacket;

import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class TioWebSocketDemo {

  public static void main(String[] args) throws Exception {
    Map<Long, Boolean> sent = new ConcurrentHashMap<>();
    int total = 1000;
    String uri = "ws://127.0.0.1:9326/hello";

    // 用于等待连接建立
    CountDownLatch openLatch = new CountDownLatch(1);
    // 用于等待所有消息接收完毕
    CountDownLatch latch = new CountDownLatch(total);

    // 定义接收消息的处理逻辑
    Consumer<? super List<Object>> onNext = new Consumer<List<Object>>() {
      @Override
      public void accept(List<Object> x) throws Exception {
        Boolean all = sent.values().stream().reduce(true, (p, c) -> p && c);
        if (all) {
          System.out.println("All sent success! ");
          // 不需要在这里减少计数,因为我们在接收消息的回调中已经处理了
        }
      }
    };

    // 创建一个用于消息处理的主题
    Subject<Object> complete = PublishSubject.create().toSerialized();
    // 订阅消息处理逻辑
    complete.buffer(total).subscribe(onNext);

    // 定义 WebSocket 事件处理函数
    java.util.function.Consumer<OpenEvent> onOpen = new java.util.function.Consumer<OpenEvent>() {
      @Override
      public void accept(OpenEvent e) {
        System.out.println("Connection opened");
        // 连接建立后,释放阻塞主线程的锁
        openLatch.countDown();
      }
    };

    java.util.function.Consumer<MessageEvent> onMessage = new java.util.function.Consumer<MessageEvent>() {
      @Override
      public void accept(MessageEvent e) {
        WebSocketPacket data = e.data;
        Long id = data.getId();
        String wsBodyText = data.getWsBodyText();
        sent.put(id, true);
        System.out.println("Received: " + wsBodyText);
        complete.onNext(id);
        latch.countDown(); // 每次接收到消息后减少计数
      }
    };

    java.util.function.Consumer<CloseEvent> onClose = new java.util.function.Consumer<CloseEvent>() {
      @Override
      public void accept(CloseEvent e) {
        System.out.printf("Connection closed: %d, %s, %s\n", e.code, e.reason, e.wasClean);
      }
    };

    java.util.function.Consumer<ErrorEvent> onError = new java.util.function.Consumer<ErrorEvent>() {
      @Override
      public void accept(ErrorEvent e) {
        System.out.println(String.format("Error occurred: %s", e.msg));
      }
    };

    java.util.function.Consumer<Throwable> onThrows = new java.util.function.Consumer<Throwable>() {
      @Override
      public void accept(Throwable t) {
        t.printStackTrace();
      }
    };

    // 配置 WebSocket 客户端
    WebsocketClientConfig wsClientConfig = new WebsocketClientConfig(onOpen, onMessage, onClose, onError, onThrows);

    // 创建 WebSocket 客户端
    WebsocketClient client = WebsocketClient.create(uri, wsClientConfig);

    // 连接到服务器
    WebSocket ws = client.connect();

    // 等待连接建立
    openLatch.await();

    // 在连接建立后发送消息
    for (int i = 0; i < total; i++) {
      ws.send("" + i);
      System.out.println("Sent: " + i);
    }

    // 等待所有消息接收完毕
    latch.await();
    System.out.println("All messages have been received.");
  }
}

精简写法

package demo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import com.litongjava.tio.websocket.client.WebSocket;
import com.litongjava.tio.websocket.client.WebsocketClient;
import com.litongjava.tio.websocket.client.config.WebsocketClientConfig;
import com.litongjava.tio.websocket.client.event.CloseEvent;
import com.litongjava.tio.websocket.client.event.ErrorEvent;
import com.litongjava.tio.websocket.client.event.MessageEvent;
import com.litongjava.tio.websocket.client.event.OpenEvent;
import com.litongjava.tio.websocket.common.WebSocketPacket;

import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class TioWebSocketDemo {

  public static void main(String[] args) throws Exception {
    Map<Long, Boolean> sent = new ConcurrentHashMap<>();
    int total = 1000;
    String uri = "ws://127.0.0.1:9326/hello";

    // 用于等待连接建立
    CountDownLatch openLatch = new CountDownLatch(1);
    // 用于等待所有消息发送完毕
    CountDownLatch latch = new CountDownLatch(total);

    // 定义接收消息的处理逻辑
    io.reactivex.functions.Consumer<? super List<Object>> onNext = x -> {
      Boolean all = sent.values().stream().reduce(true, (p, c) -> p && c);
      if (all) {
        System.out.println("All sent success! ");
        // latch.countDown(); // 不需要在这里减少计数,因为我们在接收消息的回调中已经处理了
      }
    };

    // 创建一个用于消息处理的主题
    Subject<Object> complete = PublishSubject.create().toSerialized();
    // 订阅消息处理逻辑
    complete.buffer(total).subscribe(onNext);

    // 定义 WebSocket 事件处理函数
    Consumer<OpenEvent> onOpen = e -> {
      System.out.println("Connection opened");
      // 连接建立后,释放阻塞主线程的锁
      openLatch.countDown();
    };

    Consumer<MessageEvent> onMessage = e -> {
      WebSocketPacket data = e.data;
      Long id = data.getId();
      String wsBodyText = data.getWsBodyText();
      sent.put(id, true);
      System.out.println("Received: " + wsBodyText);
      complete.onNext(id);
      latch.countDown(); // 每次接收到消息后减少计数
    };

    Consumer<CloseEvent> onClose = e -> System.out.printf("Connection closed: %d, %s, %s\n", e.code, e.reason, e.wasClean);
    Consumer<ErrorEvent> onError = e -> System.out.println(String.format("Error occurred: %s", e.msg));
    Consumer<Throwable> onThrows = Throwable::printStackTrace;

    // 配置 WebSocket 客户端
    WebsocketClientConfig wsClientConfig = new WebsocketClientConfig(onOpen, onMessage, onClose, onError, onThrows);

    // 创建 WebSocket 客户端
    WebsocketClient client = WebsocketClient.create(uri, wsClientConfig);

    // 连接到服务器
    WebSocket ws = client.connect();

    // 等待连接建立
    openLatch.await();

    // 在连接建立后发送消息
    for (int i = 0; i < total; i++) {
      ws.send("" + i);
      System.out.println("Sent: " + i);
    }

    // 等待所有消息接收完毕
    latch.await();
    System.out.println("All messages have been received.");
  }
}