maven配置
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
客户端代码
package com.geniuses.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
@Component
public class client1 {
//主题名
String topic = "tets1";
//QoS服务质量等级
int qos = 1;
//账号
String userName = "admin";
//密码
String password = "public";
String clientId = "Client1";
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
//访问服务器地址
private String broker = "tcp://127.0.0.1:1883";
// 创建客户端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
public client1() throws MqttException {
}
//初始化设置订阅的回调
public void init() {
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
}
//当有订阅的消息时会从这里接收
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("clien1收到主题为:" + topic + "的消息。------\n" + "消息为::" + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
}
///
public void connect() throws MqttException {
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
connOpts.setCleanSession(true);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);
boolean r = iMqttToken.isComplete();
if (r) {
System.out.println("client1连接到服务器成功");
} else {
System.out.println("client1连接到服务器失败");
}
}
public void publish(String mes) throws MqttException {
// 创建消息
MqttMessage message = new MqttMessage(mes.getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
sampleClient.publish(topic, message);
}
public void subscribe(String topic) throws MqttException {
//订阅消息
sampleClient.subscribe(topic, qos);
}
///
public void disconnect() throws MqttException {
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
}
}
客户端2
package com.geniuses.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
@Component
public class client2 {
//主题名
String topic = "test2";
//QoS服务质量等级
int qos = 1;
//账号
String userName = "admin";
//密码
String password = "public";
String clientId = "Client2";
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
//访问服务器地址
private String broker = "tcp://127.0.0.1:1883";
// 创建客户端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
public client2() throws MqttException {
}
//初始化设置订阅的回调
public void init() {
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
}
//当有订阅的消息时会从这里接收
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("clien2收到主题为:" + topic + "的消息。------\n" + "消息为::" + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
}
///
public void connect() throws MqttException {
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
connOpts.setCleanSession(true);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);
boolean r = iMqttToken.isComplete();
if (r) {
System.out.println("client2连接到服务器成功");
} else {
System.out.println("client2连接到服务器失败");
}
}
public void publish(String mes) throws MqttException {
// 创建消息
MqttMessage message = new MqttMessage(mes.getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
sampleClient.publish(topic, message);
}
public void subscribe(String topic) throws MqttException {
//订阅消息
sampleClient.subscribe(topic, qos);
}
///
public void disconnect() throws MqttException {
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
}
}
客户端3
package com.geniuses.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
@Component
public class client3 {
//主题名
String topic = "test3";
//QoS服务质量等级
int qos = 1;
//账号
String userName = "admin";
//密码
String password = "public";
String clientId = "Client3";
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
//访问服务器地址
private String broker = "tcp://127.0.0.1:1883";
// 创建客户端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
public client3() throws MqttException {
}
//初始化设置订阅的回调
public void init() {
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
}
//当有订阅的消息时会从这里接收
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("clien3收到主题为:" + topic + "的消息。------\n" + "消息为::" + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
}
///
public void connect() throws MqttException {
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
connOpts.setCleanSession(true);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);
boolean r = iMqttToken.isComplete();
if (r) {
System.out.println("client3连接到服务器成功");
} else {
System.out.println("client3连接到服务器失败");
}
}
public void publish(String mes) throws MqttException {
// 创建消息
MqttMessage message = new MqttMessage(mes.getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
sampleClient.publish(topic, message);
}
public void subscribe(String topic) throws MqttException {
//订阅消息
sampleClient.subscribe(topic, qos);
}
///
public void disconnect() throws MqttException {
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
}
}
controller测试类
package com.geniuses.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPObject;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
@RestController
@RequestMapping("mqttDemo")
public class MqttDemoController {
@Autowired
public client1 c1;
@Autowired
public client2 c2;
@Autowired
public client3 c3;
@RequestMapping("init")//先让用户client1、client2、client3连接上服务器
public void init() throws MqttException {
c1.init();
c1.connect();
c2.init();
c2.connect();
c3.init();
c3.connect();
}
@RequestMapping("client1/sub/{topic}")//让用户client1订阅一个话题
public void client1_sub( @PathVariable String topic) throws MqttException {
c1.subscribe(topic);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+
"---client1订阅了名为:"+topic+"的。");
}
@RequestMapping("client1/pub/{mes}")//让用户主题client1发布一个消息(话题默认为"test1")
public void client1_pub( @PathVariable String mes) throws MqttException {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
c1.publish(mes);
}
@PostMapping("saveAlgorithmData")
public void saveAlgorithmData(@RequestBody test params) throws MqttException {
// test algorithmTypeDic = JSONObject.parseObject(params, test.class);
String a= JSONObject.toJSONString(params);
c1.publish(a);
}
@RequestMapping("client2/sub/{topic}")//让用户client2订阅一个话题
public void client2_sub( @PathVariable String topic) throws MqttException {
c2.subscribe(topic);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+
"---client2订阅了名为:"+topic+"的主题。");
}
@RequestMapping("client2/pub/{mes}")//让用户client2发布一个消息(话题默认为"test2")
public void client2_pub( @PathVariable String mes) throws MqttException {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
c2.publish(mes);
}
@RequestMapping("client3/sub/{topic}")//让用户client3订阅一个话题
public void client3_sub( @PathVariable String topic) throws MqttException {
c3.subscribe(topic);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+
"---client3订阅了名为:"+topic+"的主题。");
}
@RequestMapping("client3/pub/{mes}")//让用户client3发布一个消息(话题默认为"test3")
public void client_pub( @PathVariable String mes) throws MqttException {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
c3.publish(mes);
}
}
在地址栏输入 项目地址 请求 init接口 初始化即可
版权声明:本文为qq_43684538原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。