用Java编写一个MQTT Client连接到EMQ X订阅主题发布消息,并用node-red进行测试

最近在学习MQTT方面的东西,参照EMQ的文档写了一个MQTT Client,所以写这篇博客记录一下过程。

一. EMQ的安装

本人的EMQ安装在阿里云服务器上,服务器的版本是Ubuntu18.04。具体的安装方式可以参考https://docs.emqx.io/broker/v3/cn/install.html上面有各种版本的安装方法。(阿里云服务器必须在安全组中打开1883端口)

二. 创建Maven项目
  • 打开IDEA新建一个project,选择maven工程,GroupId、ArtifactId根据自己喜好填写即可,填写完后点击next。
    在这里插入图片描述
  • Project name等信息也是根据喜好填写即可,我这里取名为mqtt_client,填写完后点击Finish即可。
    在这里插入图片描述
三. 导入maven依赖

创建好项目后在pom.xml文件中加入java-paho依赖。

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.0</version>
 </dependency>
四. 创建Demo类并实现MQTT Client
  • 在src目录下的mian目录的java目录中创建一个名为Demo(此名字也可根据个人喜好起)的类。
  • 初始化客户端并与服务器建立连接。
    MqttClient(String serverURI, String clientId, MqttClientPersistence persistence)
    
    MqttClient为构造客户端的构造函数,其参数包括:
    serverURI:EMQ X的服务端地址,此处我的服务端部署在阿里云服务器上,所以为tcp://阿里云服务器ip:1883,如果EMQ X服务端部署在本地的话填tcp://localhost:1883即可。
    clientId:这个是标识客户端的唯一ID,在同一个EMQ X服务器中必须保证唯一,否则会对处理session问题有影响。
    MqttClientPersistence:这个参数传入的是本地消息持久化实例。
  • 建立连接:
    1 设置mqtt broker地址端口号以及客户端ID
    //mqtt broker地址与端口号
    String broker = "tcp://阿里云服务器ip:1883";
    //mqtt客户端id
    String clientId = "JavaClient";
    
    2 创建本地消息的持久化对象。
    MemoryPersistence persistence = new MemoryPersistence();
    
    3 连接broker
    MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setCleanSession(true);
    System.out.println("Connecting to broker:" + broker);
    mqttClient.connect(connOpts);
    System.out.println("Connected......");
    
    4 编写完成后点击运行,如果控制台输出如下图所示,那么恭喜你,已经成功连接上MQTT服务端了。
    在这里插入图片描述
  • 订阅主题
    成功连接上服务端后,我们可以进行主题的订阅。MqttClient提供了多个订阅的方法,可用不同方式实现主题订阅。订阅主题部分代码如下:
    //主题名称
    String topic = "demo/java";
    System.out.println("Subscribe to topic:" + topic);
    //订阅demo/java主题
    mqttClient.subscribe(topic);
    //设置回调实例
    mqttClient.setCallback(new MqttCallback() {
           public void connectionLost(Throwable throwable) {
    
            }
    
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
                System.out.println(theMsg);
            }
    
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
            }
     });
    
  • 发布消息
    Mqttclient的publish方法可用于发布消息
    publish(String topic, MqttMessage message)
    
    topic:主题名称
    message:消息内容
    发布消息部分代码如下:
    //定义消息内容
    String content = "Hello I am Java.....";
    //定义qos级别
    int qos = 2;
    System.out.println("Publish message:" + content);
    MqttMessage message = new MqttMessage(content.getBytes());
    //设置qos级别
    message.setQos(qos);
    //发布消息
    mqttClient.publish(topic, message);
    System.out.println("Message published");
    
五. 测试
  1. 这里我们需要另外一个客户端也同时订阅这个主题来测试这个简易的java客户端是否能正常运作,我这里使用node-red来进行辅助测试。关于node-red的安装方法可以参考https://nodered.org/docs/getting-started/local,如果你是win10系统并且已经安装了node.js环境的话直接在cmd输入npm install -g --unsafe-perm node-red然后回车即可。安装完成之后在cmd中输入node-red即可启动,访问http://127.0.0.1:1880/即可进入node-red的界面。
    在这里插入图片描述
  2. 在node-red的web界面中找到网络并将mqtt-in节点拖出来,mqtt in节点是用来连接到MQTT代理并订阅来自指定主题的消息。
    在这里插入图片描述
  3. 设置节点属性。双击节点,在主题中输入我们java client中订阅的主题demo/java,QoS选择2与java client保持一致,然后点击服务端栏右边的编辑按钮编辑服务端信息。
    在这里插入图片描述
  4. broker的名称我这里填写为my broker,这个名称可根据个人喜好自行定义;服务端地址就填写EMQ X的地址就可与了,我这里填的是阿里云服务器的地址,因为我的EMQ X部署在阿里云服务器上。填写完成后点击右上角添加按钮即可。
    在这里插入图片描述
  5. 点击添加后会回跳到编辑mqtt in节点的界面上,在服务端中选择好刚添加的broker后点击完成即可。
  6. 可以利用debug节点来查看mqtt in节点收到的信息。在左侧的共通栏中选择debug节点并拖拽到流程面板中并将mqtt in节点与debug节点连起来。然后点击右上角的debug按钮(小虫子),点击部署。
    在这里插入图片描述
  7. 回到IDEA中,重新运行程序。
    在这里插入图片描述
  8. 此时在node-red的右侧调试窗口即可收到java client发布的消息:Hello I am Java.....
    在这里插入图片描述
    本项目的代码已上传至github:https://github.com/junlinChan/mqtt_client.git,有需要的可以到github上查看。

版权声明:本文为m0_37905997原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。