java 整合MQTT客户端和Windows安装服务端2

maven配置

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

客户端代码

package com.geniuses.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

@Component
public class client1 {
    //主题名
    String topic = "tets1";
    //QoS服务质量等级
    int qos = 1;
    //账号
    String userName = "admin";
    //密码
    String password = "public";
    String clientId = "Client1";
    // 内存存储
    MemoryPersistence persistence = new MemoryPersistence();
    //访问服务器地址
    private String broker = "tcp://127.0.0.1:1883";
    // 创建客户端
    MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    public client1() throws MqttException {
    }

    //初始化设置订阅的回调
    public void init() {
        sampleClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
            }

            //当有订阅的消息时会从这里接收
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("clien1收到主题为:" + topic + "的消息。------\n" + "消息为::" + new String(message.getPayload()));
            }


            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("deliveryComplete---------" + token.isComplete());
            }
        });


    }

    ///
    public void connect() throws MqttException {
        // 创建链接参数
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        connOpts.setCleanSession(true);
        // 设置连接的用户名
        connOpts.setUserName(userName);
        connOpts.setPassword(password.toCharArray());
        // 建立连接
        IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);
        boolean r = iMqttToken.isComplete();
        if (r) {
            System.out.println("client1连接到服务器成功");
        } else {
            System.out.println("client1连接到服务器失败");
        }
    }

    public void publish(String mes) throws MqttException {
        // 创建消息
        MqttMessage message = new MqttMessage(mes.getBytes());
        // 设置消息的服务质量
        message.setQos(qos);
        // 发布消息
        sampleClient.publish(topic, message);
    }


    public void subscribe(String topic) throws MqttException {
        //订阅消息
        sampleClient.subscribe(topic, qos);
    }

    ///
    public void disconnect() throws MqttException {
        // 断开连接
        sampleClient.disconnect();
        // 关闭客户端
        sampleClient.close();
    }
}


客户端2

package com.geniuses.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

@Component
public class client2 {
    //主题名
    String topic = "test2";
    //QoS服务质量等级
    int qos = 1;
    //账号
    String userName = "admin";
    //密码
    String password = "public";
    String clientId = "Client2";
    // 内存存储
    MemoryPersistence persistence = new MemoryPersistence();
    //访问服务器地址
    private String broker = "tcp://127.0.0.1:1883";
    // 创建客户端
    MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    public client2() throws MqttException {
    }

    //初始化设置订阅的回调
    public void init() {
        sampleClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
            }

            //当有订阅的消息时会从这里接收
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("clien2收到主题为:" + topic + "的消息。------\n" + "消息为::" + new String(message.getPayload()));
            }


            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("deliveryComplete---------" + token.isComplete());
            }
        });


    }

    ///
    public void connect() throws MqttException {
        // 创建链接参数
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        connOpts.setCleanSession(true);
        // 设置连接的用户名
        connOpts.setUserName(userName);
        connOpts.setPassword(password.toCharArray());
        // 建立连接
        IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);
        boolean r = iMqttToken.isComplete();
        if (r) {
            System.out.println("client2连接到服务器成功");
        } else {
            System.out.println("client2连接到服务器失败");
        }
    }

    public void publish(String mes) throws MqttException {
        // 创建消息
        MqttMessage message = new MqttMessage(mes.getBytes());
        // 设置消息的服务质量
        message.setQos(qos);
        // 发布消息
        sampleClient.publish(topic, message);
    }


    public void subscribe(String topic) throws MqttException {
        //订阅消息
        sampleClient.subscribe(topic, qos);
    }

    ///
    public void disconnect() throws MqttException {
        // 断开连接
        sampleClient.disconnect();
        // 关闭客户端
        sampleClient.close();
    }
}

客户端3

package com.geniuses.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

@Component
public class client3 {
    //主题名
    String topic = "test3";
    //QoS服务质量等级
    int qos = 1;
    //账号
    String userName = "admin";
    //密码
    String password = "public";
    String clientId = "Client3";
    // 内存存储
    MemoryPersistence persistence = new MemoryPersistence();
    //访问服务器地址
    private String broker = "tcp://127.0.0.1:1883";
    // 创建客户端
    MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    public client3() throws MqttException {
    }

    //初始化设置订阅的回调
    public void init() {
        sampleClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
            }

            //当有订阅的消息时会从这里接收
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("clien3收到主题为:" + topic + "的消息。------\n" + "消息为::" + new String(message.getPayload()));
            }


            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("deliveryComplete---------" + token.isComplete());
            }
        });


    }

    ///
    public void connect() throws MqttException {
        // 创建链接参数
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        connOpts.setCleanSession(true);
        // 设置连接的用户名
        connOpts.setUserName(userName);
        connOpts.setPassword(password.toCharArray());
        // 建立连接
        IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);
        boolean r = iMqttToken.isComplete();
        if (r) {
            System.out.println("client3连接到服务器成功");
        } else {
            System.out.println("client3连接到服务器失败");
        }
    }

    public void publish(String mes) throws MqttException {
        // 创建消息
        MqttMessage message = new MqttMessage(mes.getBytes());
        // 设置消息的服务质量
        message.setQos(qos);
        // 发布消息
        sampleClient.publish(topic, message);
    }


    public void subscribe(String topic) throws MqttException {
        //订阅消息
        sampleClient.subscribe(topic, qos);
    }

    ///
    public void disconnect() throws MqttException {
        // 断开连接
        sampleClient.disconnect();
        // 关闭客户端
        sampleClient.close();
    }
}

controller测试类

package com.geniuses.mqtt;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPObject;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
@RestController
@RequestMapping("mqttDemo")
public class MqttDemoController {
    @Autowired
    public client1 c1;
    @Autowired
    public client2 c2;
    @Autowired
    public client3 c3;

    @RequestMapping("init")//先让用户client1、client2、client3连接上服务器
    public void init() throws MqttException {
        c1.init();
        c1.connect();
        c2.init();
        c2.connect();
        c3.init();
        c3.connect();
    }


    @RequestMapping("client1/sub/{topic}")//让用户client1订阅一个话题
    public void client1_sub( @PathVariable String topic) throws MqttException {
        c1.subscribe(topic);
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+
                "---client1订阅了名为:"+topic+"的。");
    }

    @RequestMapping("client1/pub/{mes}")//让用户主题client1发布一个消息(话题默认为"test1")
    public void client1_pub( @PathVariable String mes) throws MqttException {
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        c1.publish(mes);
    }
    @PostMapping("saveAlgorithmData")
    public void saveAlgorithmData(@RequestBody test params) throws MqttException {
//        test algorithmTypeDic = JSONObject.parseObject(params, test.class);
       String a= JSONObject.toJSONString(params);
        c1.publish(a);
    }



    @RequestMapping("client2/sub/{topic}")//让用户client2订阅一个话题
    public void client2_sub( @PathVariable String topic) throws MqttException {
        c2.subscribe(topic);
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+
                "---client2订阅了名为:"+topic+"的主题。");
    }

    @RequestMapping("client2/pub/{mes}")//让用户client2发布一个消息(话题默认为"test2")
    public void client2_pub( @PathVariable String mes) throws MqttException {
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        c2.publish(mes);
    }




    @RequestMapping("client3/sub/{topic}")//让用户client3订阅一个话题
    public void client3_sub( @PathVariable String topic) throws MqttException {
        c3.subscribe(topic);
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+
                "---client3订阅了名为:"+topic+"的主题。");
    }

    @RequestMapping("client3/pub/{mes}")//让用户client3发布一个消息(话题默认为"test3")
    public void client_pub( @PathVariable String mes) throws MqttException {
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        c3.publish(mes);
    }
}

在地址栏输入 项目地址 请求 init接口 初始化即可


版权声明:本文为qq_43684538原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。