Java MQTT订阅端,可订阅通配符(/#)

Java实现MQTT订阅端,可订阅通配符(/#)

在做MQTT订阅端时报错:

The topic name MUST NOT contain any wildcard characters (#+)

提示Topic中不能包含通配符

  • 主函数

PublishSubscribe.java

package Test;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.Map;
import java.util.UUID;


 
/**
 * mqtt的发布和订阅
 *
 * @author wzq
 */
public class PublishSubscribe {
 
 
    private static String serviceURI = "tcp://192.168.32.160:1883";
    private static String clientID = UUID.randomUUID().toString();
    private static MqttClientPersistence persistence = new MemoryPersistence();
    //如果mqtt服务配置了匿名访问,则不需要使用用户名和密码就可以实现消息的订阅和发布
    private static String username = "admin";
    private static String password = "123456";
    private static String topic = "capability/#";
    /*
        消息服务质量,一共有三个:
        0:尽力而为。消息可能会丢,但绝不会重复传输
        1:消息绝不会丢,但可能会重复传输
        2:恰好一次。每条消息肯定会被传输一次且仅传输一次
     */
    private static int qos = 0;
 
    /**
     * 消息发布
     *
     * @author wzq
     **/
    public static void publish() {
        try {
            MqttClient client = new MqttClient(serviceURI, clientID, persistence);
            MqttConnectOptions connectOptions = new MqttConnectOptions();
//            connectOptions.setUserName(username);
//            connectOptions.setPassword(password.toCharArray());
            connectOptions.setCleanSession(false);
            //发布者连接服务
            client.connect(connectOptions);
            System.out.println("发布者连接状态: " + client.isConnected());
            MqttTopic mqttTopic = client.getTopic(topic);
            //MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            int i = 1;
            String message = "hello,world-";
            while (true) {
                String payLoad = message + i++;
                mqttMessage.setPayload(payLoad.getBytes());
                MqttDeliveryToken deliveryToken = mqttTopic.publish(mqttMessage);
                if (!deliveryToken.isComplete()) {
                    System.out.println("发布者发布消息: " + payLoad + " 失败");
                    deliveryToken.waitForCompletion();
                } else {
                    System.out.println("发布者发布消息: " + payLoad + " 成功");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 消息订阅
     *
     * @author wzq
     **/
    static Map<String, Object> map1;
    public static Map<String, Object> subscribe() {
    	
        try {
            MqttClient client = new MqttClient(serviceURI, clientID, persistence);
            client.setCallback(new PushCallback());
            MqttConnectOptions connectOptions = new MqttConnectOptions();
            connectOptions.setUserName(username);
            connectOptions.setPassword(password.toCharArray());
            connectOptions.setCleanSession(false);
            //订阅者连接订阅主题
            client.connect(connectOptions);
            client.subscribe(topic, qos);
            System.out.println("订阅者连接状态: " + client.isConnected());

        } catch (MqttException e) {
            e.printStackTrace();
        }
        return map1;
    }
 
    public static void anay(String topic,String message){

    	System.out.println("topic:"+topic);
    	System.out.println("message:"+message);
    	String topic_content=topic;
    	String message_content=message;

    }
}
  • 回调函数

PushCallback.java

package Test;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

public class PushCallback implements MqttCallback {

	static int num=0;

	@Override
	public void connectionLost(Throwable arg0) {
		// TODO Auto-generated method stub
        System.out.println("订阅者连接丢失...");
        System.out.println(arg0.getMessage());
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) {
		// TODO Auto-generated method stub

	}

	@Override
	public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
		// TODO Auto-generated method stub
    	System.out.println("订阅者接收到主题: " + arg0.toString());
        System.out.println("订阅者接收到消息: " + arg1.toString());
        anay(arg0.toString(), arg1.toString());
        
	}
    public static void anay(String topic,String message){
    	num++;
    	if(num>1){
    		JsonParser parser = new JsonParser() ;
    		JsonObject object=(JsonObject) parser.parse(message);
    		object.addProperty("topic", topic);
    		Client.array.add(object);
    		System.out.println(Client.array);
    	}
    }

}

  • 测试

client.java

package Test;

import java.util.Map;

import com.google.gson.JsonArray;

public class Client {

	static JsonArray array = new JsonArray();
	public static void main(String[] args) {
		PublishSubscribe ps=new PublishSubscribe();
		Map<String, Object> map=ps.subscribe();
	}
}

下载链接

关注即可下载:

https://download.csdn.net/download/lzl980111/13126310

参考优秀博客:

https://blog.csdn.net/q1165328963/article/details/105516611
https://blog.csdn.net/paladinzh/article/details/88030238


版权声明:本文为lzl980111原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。