引入maven依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>一、Mqtt
@Component
public class Mqtt implements CommandLineRunner {
@Autowired
private CommitUserInfo commitUserInfol;
@Autowired
private UserInfoDao userInfoDao; // 数据库CRUD接口
@Autowired
private ApplicationRedis applicationRedis; // redis 接口
@Autowired
JudgeEquipmentService judgeEquipmentService; // 查询/修改---当前设备号码状态
@Override
public void run(String... args) throws Exception {
applicationRedis.test();
MqttConnectionUtils.start(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol);
}
}2.MqttConnectionUtils
private static MqttClient client;
private static MqttConnectOptions connectOptions;
private static String TOPIC;
private static String clientId;
private static final Logger LOG = LogManager.getLogger(MqttConnectionUtils.class);
static {
try {
clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
client = new MqttClient("tcp://****:1883",clientId);
connectOptions=new MqttConnectOptions();
connectOptions.setCleanSession(false);
connectOptions.setUserName("用户名");
connectOptions.setPassword(PropertiesReader.getPassword().toCharArray());//密码 connectOptions.setConnectionTimeout(10);
client.setTimeToWait(10000);
client.connect(connectOptions);
TOPIC = PropertiesReader.getTopic();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发送数据
*/
public static void publish(String topic,String content) throws MqttException {
MqttMessage message=new MqttMessage(content.getBytes());
message.setQos(1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
LOG.info("发送时间========"+df.format(new Date()));
LOG.info(topic+"主题发送成功,内容:"+message);
client.publish(topic,message);
}
/**
* 接收数据
*/
public static void start(UserInfoDao userInfoDao, ApplicationRedis applicationRedis,
JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol) throws MqttException {
MqttTopic topic = client.getTopic(TOPIC);
// setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
connectOptions.setWill(topic, "close".getBytes(), 2, true);
// 订阅消息
int[] Qos = { 1 };
String[] topic1 = { TOPIC };
client.subscribe(topic1, Qos);
// 设置回调
client.setCallback(new PushCallback(userInfoDao,applicationRedis,judgeEquipmentService,commitUserInfol));
LOG.info("WIFI版启动成功=================");
}
/**
* mqtt重连
*/
public static void reConnect() {
while (true){
try {
if (null != client && !(client.isConnected())) {
Thread.sleep(1000);
clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
client.connect(connectOptions);
LOG.info("=======尝试重新连接==============");
break;
}
} catch (MqttException | InterruptedException e) {
LOG.info("=======重新连接失败:{}==============", e.toString());
continue;
}
}
}3.PushCallback
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.UnsupportedEncodingException;
import java.util.*;
public class PushCallback implements MqttCallback{
public static PushCallback pushCallback;
@Autowired
private UserInfoDao userInfoDao; // 数据库CRUD接口
@Autowired
private CommitUserInfo commitUserInfol;
@Autowired
private ApplicationRedis applicationRedis; // redis 接口
@Autowired
JudgeEquipmentService judgeEquipmentService; // 查询/修改---当前设备号码状态
public PushCallback(UserInfoDao userInfoDao,ApplicationRedis applicationRedis,
JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol){
this.userInfoDao = userInfoDao;
this.applicationRedis = applicationRedis;
this.judgeEquipmentService = judgeEquipmentService;
this.commitUserInfol = commitUserInfol;
}
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
System.out.println("WIFI版======连接断开,可以做重连");
MqttConnectionUtils.reConnect();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String messages = new String(message.getPayload());
if(!messages.equals("close")){
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
try {
perform(topic,json);
}catch (Exception e){
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------" +iMqttDeliveryToken.isComplete());
}
public void perform(String topicP,JSONObject json) throws MqttException, UnsupportedEncodingException {
//你的业务模块
}
}


版权声明:本文为qq_37996327原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。