EMQX

本文用来设置和运行一个基于 MQTT 协议的客户端程序,利用 Eclipse Paho MQTT 客户端库和 tio-boot 框架进行消息发布和订阅。

Maven 依赖

首先,<dependency>部分是 Maven 构建配置,用于在你的项目中包含 Eclipse Paho MQTT 客户端库。

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.5</version>
</dependency>

app.properties

eqmx 配置

emqx.broker=tcp://192.168.3.9:1883
emqx.username=username
emqx.password=password
emqx.topic=test/topic
emqx.qos=2

HelloApp 类

这是一个使用 Litongjava Tio 框架启动应用程序的主类。@AComponentScan注解自动扫描和加载组件。

@AComponentScan
public class HelloApp {
  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    TioApplication.run(HelloApp.class, args);
    long end = System.currentTimeMillis();
    System.out.println((end - start) + "ms");
  }
}

SampleCallback 类

这是一个实现了 MqttCallback 接口的类。它提供了三个方法来处理与 MQTT 消息相关的事件:

  1. connectionLost(Throwable cause): 当连接丢失时调用。
  2. messageArrived(String topic, MqttMessage message): 当收到新消息时调用。
  3. deliveryComplete(IMqttDeliveryToken token): 当消息成功发送后调用。
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SampleCallback implements MqttCallback {
  // 连接丢失
  public void connectionLost(Throwable cause) {
    System.out.println("connection lost:" + cause.getMessage());
  }

  // 收到消息
  public void messageArrived(String topic, MqttMessage message) {
    System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:"
        + new String(message.getPayload()));
  }

  // 消息传递成功
  public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("deliveryComplete");
  }
}

MqttClientUtils 类

这个工具类用于简化 MQTT 客户端的操作。它提供了保存客户端、发布消息等功能。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttClientUtils {
  private static MqttClient client;
  private static String topic;
  private static int qos;

  public static void init(MqttClient client, String topic, int qos) {
    MqttClientUtils.client = client;
    MqttClientUtils.topic = topic;
    MqttClientUtils.qos = qos;
  }

  public static MqttClient getClient() {
    return client;
  }

  public static void publish(MqttMessage message) {
    message.setQos(qos);
    try {
      client.publish(topic, message);
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  public static void publishWithQos(MqttMessage message) {
    try {
      client.publish(topic, message);
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
}

EmqxClientConfig 类

这是一个配置类,用于初始化 MQTT 客户端和连接设置。它使用 EnvUtils 从环境或配置文件中获取配置,并初始化一个 MQTT 客户端实例。该实例配置了连接选项、回调处理以及订阅的 topic。此外,还有一个销毁方法,用于在应用程序关闭时正确地断开 MQTT 客户端的连接。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.web.hello.utils.MqttClientUtils;

import lombok.extern.slf4j.Slf4j;

@AConfiguration
@Slf4j
public class EmqxClientConfig {
  @AInitialization
  public void config() {
    String broker = EnvUtils.get("emqx.broker");
    String username = EnvUtils.get("emqx.username");
    String password = EnvUtils.get("emqx.password");
    String topic = EnvUtils.get("emqx.topic");

    int qos = EnvUtils.getInt("emqx.qos", 2);

    String clientId = MqttClient.generateClientId();
    // 持久化
    MemoryPersistence persistence = new MemoryPersistence();
    // MQTT 连接选项
    MqttConnectOptions connOpts = new MqttConnectOptions();
    // 设置认证信息
    connOpts.setUserName(username);
    connOpts.setPassword(password.toCharArray());
    try {
      MqttClient client = new MqttClient(broker, clientId, persistence);
      // 设置回调
      client.setCallback(new SampleCallback());
      // 建立连接
      log.info("Connecting to broker: " + broker);
      client.connect(connOpts);
      log.info("Connected to broker: " + broker);
      // 订阅 topic
      client.subscribe(topic, qos);
      log.info("Subscribed to topic: " + topic);

      TioBootServer.me().addDestroyMethod(() -> {
        try {
          client.disconnect();
          log.info("Disconnected");
          client.close();
        } catch (MqttException e) {
          e.printStackTrace();
        }
      });

      MqttClientUtils.init(client,topic,qos);

    } catch (MqttException me) {
      log.error("reason " + me.getReasonCode());
      log.error("msg " + me.getMessage());
      log.error("loc " + me.getLocalizedMessage());
      log.error("cause " + me.getCause());
      log.error("excep " + me);
      me.printStackTrace();
    }
  }
}

IndexController 类

这是一个简单的控制器,用于处理 HTTP 请求。当访问应用程序的根路径时,它会发布一个 "Hello World" 消息到 MQTT 服务器。

import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.litongjava.tio.http.server.annotation.RequestPath;
import com.litongjava.tio.web.hello.utils.MqttClientUtils;

@RequestPath("/")
public class IndexController {
  @RequestPath()
  public String index() {
    String content = "Hello World";
    MqttMessage message = new MqttMessage(content.getBytes());
    MqttClientUtils.publish(message);
    return "index";
  }
}

发布消息

使用 MqttClientUtils 类来发布消息到 MQTT 服务器。

// 发布消息
MqttClient client = MqttClientUtils.getClient();
String content = "Hello World";
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
log.info("Message published");

主要涉及使用 Eclipse Paho MQTT 客户端库进行消息发布和订阅,以及使用 tio-boot 框架构建的 Web 应用程序。它演示了如何配置 MQTT 客户端,处理连接丢失、消息到达和消息传递完成的事件,发布和订阅消息,并通过 HTTP 接口触发消息发布。