话不多说直接上代码:
生产者:
回调函数
/**
* MQTT 推送回调
*
* @author wunaozai
* @date 2018-08-22
*/
public class MqttPushCallback implements MqttCallback {
private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
@Override
public void connectionLost(Throwable cause) {
//log.info("断开连接,建议重连" + this);
//断开连接,建议重连
// new MqttPushClient();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//log.info("推送完成" + token.getMessage());
log.info("推送完成" + token.isComplete() + "");
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.info("Topic: " + topic);
}
}
private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
public static String MQTT_HOST = "tcp://114.116.90.167:1883";
public static String MQTT_CLIENTID = "client_id";
public static String MQTT_USERNAME = "user"; //admin
public static String MQTT_PASSWORD = "user65789"; //admin53421
public static int MQTT_TIMEOUT = 10;
public static int MQTT_KEEPALIVE = 10;
private MqttClient client;
private static volatile MqttPushClient mqttClient = null;
public static MqttPushClient getInstance() {
if(mqttClient == null) {
synchronized (MqttPushClient.class) {
if(mqttClient == null) {
mqttClient = new MqttPushClient();
}
}
}
return mqttClient;
}
public MqttPushClient() {
log.info("Connect MQTT: " + this);
connect();
}
public void connect() {
try {
client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions option = new MqttConnectOptions();
option.setCleanSession(false);
option.setUserName(MQTT_USERNAME);
option.setPassword(MQTT_PASSWORD.toCharArray());
option.setConnectionTimeout(MQTT_TIMEOUT);
option.setKeepAliveInterval(MQTT_KEEPALIVE);
option.setAutomaticReconnect(true);
try {
client.setCallback(new MqttPushCallback());
client.connect(option);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布主题,用于通知<br>
* 默认qos为1 非持久化 retined:false 非持久化 true 持久化
* @param topic
* @param data
*/
public void publish(String topic, String data) {
publish(topic, data, 1, false);
}
/**
* 发布
* @param topic
* @param data
* @param qos
* @param retained
*/
public void publish(String topic, String data, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(data.getBytes());
MqttTopic mqttTopic = client.getTopic(topic);
if(null == mqttTopic) {
log.error("Topic Not Exist");
}
MqttDeliveryToken token;
try {
token = mqttTopic.publish(message);
token.waitForCompletion();
log.info("MQTT推送消息成功,message is:[{}]",message);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题 qos默认为1
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 2);
}
/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
log.info("接受到的消息是————————》》》"+"2222222222222222222222");
try {
log.info("接受到的消息是————————》》》"+"111111111111111111");
log.info("接受到的消息是————————》》》"+client);
log.info("接受到的消息是————————》》》"+client.getClientId());
//client.subscribe(topic, qos);
//IMqttToken token = client.subscribeWithResponse(topic,qos);
//String msg = String.valueOf(token.getResponse().getPayload());
client.subscribe(topic, 2);
// client.subscribe(topic, 2, new IMqttMessageListener() {
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// log.info("接受到的消息是————————》》》333333333");
// log.info("接受到的消息是————————》》》"+new String(message.getPayload()));
// }
// });
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 清空主题
*/
public void cleanTopic(String topic) {
if (null != client || client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
log.info("mqttClient is error");
}
}
}消费者:
/**
* Active mq 服务端,消费
* @author pengheng
* @create 2020-06-02
**/
@Component
public class Topic_Consumer implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(Topic_Consumer.class);
public static String MQTT_USERNAME = "user"; //admin
public static String MQTT_PASSWORD = "user65789"; //admin53421
@Autowired
public BsAppAccountService bsAppAccountService;
@Autowired
private XinGePushService xinGePushService;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
}
public void init() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(
// ActiveMQConnectionFactory.DEFAULT_USER,
// ActiveMQConnectionFactory.DEFAULT_PASSWORD,
MQTT_USERNAME,
MQTT_PASSWORD,
"tcp://114.116.90.167:61616"
);
Connection conn = factory.createConnection();
// conn.setClientID("consumer1");
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//与生产者的消息目的地相同
Destination dest = session.createTopic(ContentPrefixConst.getMerchantAppUserIdentity());
MessageConsumer messConsumer = session.createConsumer(dest);
messConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try{
if(m instanceof TextMessage){ //接收文本消息
TextMessage message = (TextMessage)m;
log.info(message.getText());
}else if(m instanceof MapMessage){ //接收键值对消息
MapMessage message = (MapMessage)m;
log.info(">>"+message.getLong("age"));
log.info(">>"+message.getDouble("sarray"));
log.info(message.getString("username"));
}else if(m instanceof StreamMessage){ //接收流消息
StreamMessage message = (StreamMessage)m;
log.info(message.readString());
log.info(">>"+message.readLong());
}else if(m instanceof BytesMessage){ //接收字节消息
byte[] b = new byte[1024];
int len = -1;
BytesMessage message = (BytesMessage)m;
StringBuffer buffer = new StringBuffer();
while((len=message.readBytes(b))!=-1){
log.info(" =============================>pz =============================>");
log.info(new String(b, 0, len));
buffer.append(new String(b, 0, len));
}
pushXingge(buffer.toString());
}
}catch(JMSException e){
e.printStackTrace();
}
}
});
}
/**
* 推送到xingge
* @param
* @param
*/
public void pushXingge(String message){
JSONObject object = FastJsonUtils.getStringToJson(message);
String merchantId = object.getString("merchantId");
String title = object.getString("title");
String content = object.getString("content");
String id = object.getString("id");
String type = object.getString("type");
Map tmAppMap = bsAppAccountService.selectTmappCount(merchantId);
if (null != tmAppMap) {
if (StringUtils.isNotBlank(String.valueOf(tmAppMap.get("d_token")))) {
String token = String.valueOf(tmAppMap.get("d_token"));
log.info("信鸽推送消息查到的token:" + token);
if (StringUtils.isNotBlank(token)) {
String d_type = String.valueOf(tmAppMap.get("d_type"));
ArrayList<String> token_list = new ArrayList<>();
token_list.add(token);
//要打开的页面调/payment/waterEleReadPayment
xinGePushService.pushSingleDevice(token_list, d_type, title, content, id, type);
}
}
}
log.info("active mq 渠道推送结束");
}
}
注意:
tcp://localhost:8161(管理端口)
tcp://127.0.0.1:61616(服务端口)
端口不弄错了哦!
版权声明:本文为weixin_39677064原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。