Java实现MQTT订阅端,可订阅通配符(/#)
在做MQTT订阅端时报错:
The topic name MUST NOT contain any wildcard characters (#+)
提示Topic中不能包含通配符
- 主函数
PublishSubscribe.java
package Test;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Map;
import java.util.UUID;
/**
* mqtt的发布和订阅
*
* @author wzq
*/
public class PublishSubscribe {
private static String serviceURI = "tcp://192.168.32.160:1883";
private static String clientID = UUID.randomUUID().toString();
private static MqttClientPersistence persistence = new MemoryPersistence();
//如果mqtt服务配置了匿名访问,则不需要使用用户名和密码就可以实现消息的订阅和发布
private static String username = "admin";
private static String password = "123456";
private static String topic = "capability/#";
/*
消息服务质量,一共有三个:
0:尽力而为。消息可能会丢,但绝不会重复传输
1:消息绝不会丢,但可能会重复传输
2:恰好一次。每条消息肯定会被传输一次且仅传输一次
*/
private static int qos = 0;
/**
* 消息发布
*
* @author wzq
**/
public static void publish() {
try {
MqttClient client = new MqttClient(serviceURI, clientID, persistence);
MqttConnectOptions connectOptions = new MqttConnectOptions();
// connectOptions.setUserName(username);
// connectOptions.setPassword(password.toCharArray());
connectOptions.setCleanSession(false);
//发布者连接服务
client.connect(connectOptions);
System.out.println("发布者连接状态: " + client.isConnected());
MqttTopic mqttTopic = client.getTopic(topic);
//MqttMessage mqttMessage = new MqttMessage(message.getBytes());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
int i = 1;
String message = "hello,world-";
while (true) {
String payLoad = message + i++;
mqttMessage.setPayload(payLoad.getBytes());
MqttDeliveryToken deliveryToken = mqttTopic.publish(mqttMessage);
if (!deliveryToken.isComplete()) {
System.out.println("发布者发布消息: " + payLoad + " 失败");
deliveryToken.waitForCompletion();
} else {
System.out.println("发布者发布消息: " + payLoad + " 成功");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 消息订阅
*
* @author wzq
**/
static Map<String, Object> map1;
public static Map<String, Object> subscribe() {
try {
MqttClient client = new MqttClient(serviceURI, clientID, persistence);
client.setCallback(new PushCallback());
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setUserName(username);
connectOptions.setPassword(password.toCharArray());
connectOptions.setCleanSession(false);
//订阅者连接订阅主题
client.connect(connectOptions);
client.subscribe(topic, qos);
System.out.println("订阅者连接状态: " + client.isConnected());
} catch (MqttException e) {
e.printStackTrace();
}
return map1;
}
public static void anay(String topic,String message){
System.out.println("topic:"+topic);
System.out.println("message:"+message);
String topic_content=topic;
String message_content=message;
}
}
- 回调函数
PushCallback.java
package Test;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class PushCallback implements MqttCallback {
static int num=0;
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
System.out.println("订阅者连接丢失...");
System.out.println(arg0.getMessage());
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅者接收到主题: " + arg0.toString());
System.out.println("订阅者接收到消息: " + arg1.toString());
anay(arg0.toString(), arg1.toString());
}
public static void anay(String topic,String message){
num++;
if(num>1){
JsonParser parser = new JsonParser() ;
JsonObject object=(JsonObject) parser.parse(message);
object.addProperty("topic", topic);
Client.array.add(object);
System.out.println(Client.array);
}
}
}
- 测试
client.java
package Test;
import java.util.Map;
import com.google.gson.JsonArray;
public class Client {
static JsonArray array = new JsonArray();
public static void main(String[] args) {
PublishSubscribe ps=new PublishSubscribe();
Map<String, Object> map=ps.subscribe();
}
}
下载链接
关注即可下载:
https://download.csdn.net/download/lzl980111/13126310
参考优秀博客:
https://blog.csdn.net/q1165328963/article/details/105516611
https://blog.csdn.net/paladinzh/article/details/88030238
版权声明:本文为lzl980111原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。