Mica-mqtt

添加依赖

<mica-mqtt.version>2.2.6</mica-mqtt.version>
<dependency>
  <groupId>net.dreamlu</groupId>
  <artifactId>mica-mqtt-client</artifactId>
  <version>${mica-mqtt.version}</version>
</dependency>
package com.litongjava.mica.mqtt.client.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;

import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener;

/**
 * Client Connection Status Listening
 */
public class MqttClientConnectListener implements IMqttClientConnectListener {
  private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);

  @Override
  public void onConnected(ChannelContext context, boolean isReconnect) {
    if (isReconnect) {
      logger.info("Reconnect mqtt server reconnected successfully");
    } else {
      logger.info("Connection to mqtt server successful");
    }
  }

  @Override
  public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
    logger.error("mqtt link broken remark:{} isRemove:{}", remark, isRemove, throwable);
  }

}
package com.litongjava.mica.mqtt.client.config;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

import org.tio.core.ChannelContext;

import com.litongjava.jfinal.aop.annotation.Bean;
import com.litongjava.jfinal.aop.annotation.Configuration;

import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;

@AConfiguration
@Slf4j
public class MicaMQTTClientConfig {

  @ABean
  public MqttClient MqttClient() {
    // 初始化 mqtt 客户端
    MqttClientCreator creator = MqttClient.create().ip("192.168.3.9").port(1883).username("mica").password("mica");
    // 连接监听
    MqttClient client = creator.connectListener(new MqttClientConnectListener()).willMessage(builder -> {
      builder.topic("/test/offline").messageText("down").retain(false).qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息
    })
        // 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。
        .connectSync();

    // 订阅
    client.subQos0("/test/123", new IMqttClientMessageListener() {
      @Override
      public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
        // 订阅成功之后触发,可在此处做一些业务逻辑
        log.info("topicFilter:{} MqttQoS:{} Subscription successful", topicFilter, mqttQoS);
      }

      @Override
      public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
        log.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
      }
    });

    // 发送
    Timer timer = new Timer();
    timer.schedule(new TimerTask() {
      @Override
      public void run() {
        client.publish("/test/client", "hello this is mica client".getBytes(StandardCharsets.UTF_8));
      }
    }, 1000, 2000);

    return client;
  }
}

这段代码主要涉及两个 Java 类,它们是用于配置和实现 MQTT 客户端的功能。让我们逐个进行解释:

MqttClientConnectListener 类

这个类实现了 IMqttClientConnectListener 接口,用于监听 MQTT 客户端的连接状态。

  • 方法:
    • onConnected: 当客户端成功连接到 MQTT 服务器时调用。如果是重新连接(isReconnecttrue),则记录“重连成功”的消息;否则,记录“连接成功”的消息。
    • onDisconnect: 当客户端与 MQTT 服务器的连接断开时调用。记录断开连接的详细信息和错误(如果有)。

MicaMQTTClientConfig 类

这个类使用了 JFinal 的 @AConfiguration@ABean 注解,表明它是一个配置类,用于初始化和配置 MQTT 客户端。

  • 方法:
    • MqttClient: 定义了一个 MqttClient Bean。在这个方法中,执行了以下操作:
      • 初始化 MQTT 客户端:使用 MqttClient.create() 创建一个 MQTT 客户端实例,并设置了服务器的 IP 地址、端口、用户名和密码。
      • 设置连接监听器:添加了前面定义的 MqttClientConnectListener 实例作为连接监听器。
      • 配置遗嘱消息:设置了客户端的遗嘱消息,当客户端意外断开连接时,服务器将会发布这条消息到指定的主题(/test/offline)。
      • 同步连接:调用 connectSync() 方法来同步连接到 MQTT 服务器。
      • 订阅主题:订阅了 /test/123 主题,并定义了如何处理订阅成功事件和收到的消息。
      • 定时发送消息:使用 Timer 定时向 /test/client 主题发送消息。

这段代码通过 JFinal AOP 提供的注解方式配置了一个 MQTT 客户端。客户端在启动时会自动连接到 MQTT 服务器,订阅指定的主题,并定期向另一个主题发送消息。同时,它通过实现连接监听器来记录连接和断开事件。这样的设置在 IoT(物联网)应用中很常见,用于设备与 MQTT 服务器之间的通信。