java springboot+MQTT协议 最完整版

引入maven依赖

  <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
一、Mqtt
@Component
public class Mqtt implements CommandLineRunner {


    @Autowired
    private CommitUserInfo commitUserInfol;
    @Autowired
    private UserInfoDao userInfoDao;  // 数据库CRUD接口
    @Autowired
    private ApplicationRedis applicationRedis;  // redis  接口
    @Autowired
    JudgeEquipmentService judgeEquipmentService; // 查询/修改---当前设备号码状态

    @Override
    public void run(String... args) throws Exception {
        applicationRedis.test();
        MqttConnectionUtils.start(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol);

    }



}

2.MqttConnectionUtils

 


private static MqttClient client;

    private static MqttConnectOptions connectOptions;

    private static String TOPIC;

    private static String clientId;

    private static final Logger LOG = LogManager.getLogger(MqttConnectionUtils.class);
    static {
        try {
            clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
            client = new MqttClient("tcp://****:1883",clientId);
            connectOptions=new MqttConnectOptions();
            connectOptions.setCleanSession(false);
            connectOptions.setUserName("用户名");
            connectOptions.setPassword(PropertiesReader.getPassword().toCharArray());//密码            connectOptions.setConnectionTimeout(10);
            client.setTimeToWait(10000);
            client.connect(connectOptions);
            TOPIC = PropertiesReader.getTopic();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


 /**
     * 发送数据
     */

    public static void publish(String topic,String content) throws MqttException {
        MqttMessage message=new MqttMessage(content.getBytes());
        message.setQos(1);
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
        LOG.info("发送时间========"+df.format(new Date()));
        LOG.info(topic+"主题发送成功,内容:"+message);
        client.publish(topic,message);
    }

/**
     * 接收数据
     */

public static void start(UserInfoDao userInfoDao, ApplicationRedis applicationRedis,
                             JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol) throws MqttException {
        MqttTopic topic = client.getTopic(TOPIC);
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        connectOptions.setWill(topic, "close".getBytes(), 2, true);

        // 订阅消息
        int[] Qos = { 1 };
        String[] topic1 = { TOPIC };
        client.subscribe(topic1, Qos);
        // 设置回调
        client.setCallback(new PushCallback(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol));
        LOG.info("WIFI版启动成功=================");
    }


 /**
     * mqtt重连
     */
    public static void reConnect() {
        while (true){
            try {
                if (null != client && !(client.isConnected())) {
                    Thread.sleep(1000);
                    clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
                    client.connect(connectOptions);
                    LOG.info("=======尝试重新连接==============");
                    break;
                }
            } catch (MqttException | InterruptedException e) {
                LOG.info("=======重新连接失败:{}==============", e.toString());
                continue;
            }
        }

    }

3.PushCallback



import com.alibaba.fastjson.JSONObject;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.UnsupportedEncodingException;
import java.util.*;

public class PushCallback implements MqttCallback{

    public static PushCallback pushCallback;

    @Autowired
    private UserInfoDao userInfoDao;  // 数据库CRUD接口

    @Autowired
    private CommitUserInfo commitUserInfol;

    @Autowired
    private ApplicationRedis applicationRedis;  // redis  接口
    @Autowired
    JudgeEquipmentService judgeEquipmentService; // 查询/修改---当前设备号码状态



    public PushCallback(UserInfoDao userInfoDao,ApplicationRedis applicationRedis,
                        JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol){
        this.userInfoDao = userInfoDao;
        this.applicationRedis = applicationRedis;
        this.judgeEquipmentService = judgeEquipmentService;
        this.commitUserInfol = commitUserInfol;
    }
    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("WIFI版======连接断开,可以做重连");
        MqttConnectionUtils.reConnect();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        String messages = new String(message.getPayload());
        if(!messages.equals("close")){
            System.out.println("接收消息主题 : " + topic);
            System.out.println("接收消息Qos : " + message.getQos());
            System.out.println("接收消息内容 : " + new String(message.getPayload()));
            try {
                
                    perform(topic,json);
                
            }catch (Exception e){

            }

        }


    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("deliveryComplete---------" +iMqttDeliveryToken.isComplete());
      }

      public  void perform(String topicP,JSONObject json) throws MqttException, UnsupportedEncodingException {
          //你的业务模块
      }


}

 


版权声明:本文为qq_37996327原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。