MQ实现DEMO-入门

1,新建消息处理中心

package com.smartnest.ability.module.RabbitMq;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @Author LS
 * @Description 消息处理中心 Broker
 * @Date 17:00 2022/9/21 0021
 * @Param
 * @return
 **/
public class Broker {

    // 队列存储消息的最大数量
    private final static int MAX_SIZE = 5;

    // 保存消息数据的容器
    private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);

    /**
     * @Author LS
     * @Description 生产消息
     * @Date 17:02 2022/9/21 0021
     * @Param
     * @return
     **/
    public static void produce(String msg) {
        if (messageQueue.offer(msg)) {
            System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
        } else {
            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        System.out.println("=======================");

    }

    /**
     * @Author LS
     * @Description 消费消息
     * @Date 17:01 2022/9/21 0021
     * @Param
     * @return
     **/
    public static String consume() {

        String msg = (String) messageQueue.poll();
        if (msg != null) {
            // 消费条件满足情况,从消息容器中取出一条消息
            System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
        } else {
            System.out.println("消息处理中心内没有消息可供消费!");
        }
        System.out.println("=======================");
        return msg;
    }

}

2,建立消息处理中心服务 BrokerServer

package com.smartnest.ability.module.RabbitMq;


import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @Author LS
 * @Description 消息处理中心服务 BrokerServer
 * @用于启动消息处理中心
 * @Date 17:13 2022/9/21 0021
 * @Param
 * @return
 **/
public class BrokerServer  implements Runnable {

    public static int SERVICE_PORT = 9999;

    private final Socket socket;

    public BrokerServer(Socket socket) {
        this.socket = socket;
    }

    /**
     * @Author LS
     * @Description 消息服务
     * @Date 17:47 2022/9/21 0021
     * @Param []
     * @return void
     **/
    @Override
    public void run() {
        try (
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ) {
            while (true) {
                String str = in.readLine();
                if (str == null) {
                    continue;
                }
                System.out.println("接收到原始数据:" + str);
                if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息
                    //从消息队列中消费一条消息
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                } else if (str.contains("SEND:")){
                    //接受到的请求包含SEND:字符串 表示生产消息放到消息队列中
                    Broker.produce(str);
                } else {
                    System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
    /**
     * @Author LS
     * @Description 启动服务 建立 链接
     * @Date 17:54 2022/9/21 0021
     * @Param [args]
     * @return void
     **/
    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while (true) {
            BrokerServer brokerServer = new BrokerServer(server.accept());
            new Thread(brokerServer).start();
        }
    }

}

3,消费者 - 消费消息 代码

package com.smartnest.ability.module.RabbitMq;


/**
 * @Author LS
 * @Description 消费者
 * @Date 17:16 2022/9/21 0021
 * @Param
 * @return
 **/
public class ConsumeClient {

    public static void main(String[] args) throws Exception {
        System.out.println("---------------------------消费====消息----------------------------");
        for (int i = 0; i < 7; i++) {
            String message = MqClient.consume();
            System.out.println("消费者 获取的消息 = :" + message);
        }

    }
}

4,客户端 MqClient

package com.smartnest.ability.module.RabbitMq;


import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;

/**
 * @Author LS
 * @Description 客户端 MqClient
 * @访问消息队列的客户端
 * @Date 17:14 2022/9/21 0021
 * @Param
 * @return
 **/
public class MqClient {

    //生产消息
    public static void produce(String message) throws Exception {
        //本地的的BrokerServer.SERVICE_PORT 创建SOCKET
        Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
        try (
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ) {
            System.out.println("生产者 发布的消息 = : " + message);
            out.println(message);
            out.flush();
        }
    }

    //消费消息
    public static String consume() throws Exception {
        Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
        try (
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ) {
            //先向消息队列发送命令
            out.println("CONSUME");
            out.flush();
            //再从消息队列获取一条消息
            String message = in.readLine();
            return message;
        }
    }

}

5,生产者

package com.smartnest.ability.module.RabbitMq;


/**
 * @Author LS
 * @Description 生产者
 * @Date 17:15 2022/9/21 0021
 * @Param
 * @return
 **/
public class ProduceClient {

    public static void main(String[] args) throws Exception {
        //SEND:  表示通道, 可以是不同的通道
        //生产  消息
        MqClient.produce("SEND:Hello World");
        MqClient.produce("SEND:Hello World1");
        MqClient.produce("SEND:Hello World2");
        MqClient.produce("SEND:Hello World3");
        MqClient.produce("SEND:Hello World4");
        MqClient.produce("SEND:Hello World5");
        MqClient.produce("SEND:Hello World6");
    }

}

需要先启动:

BrokerServer

然后启动:生成消息服务

ProduceClient

最后测试运行:

ConsumeClient

控制台输出:

Connected to the target VM, address: '127.0.0.1:62545', transport: 'socket'
接收到原始数据:SEND:Hello World1
接收到原始数据:SEND:Hello World2
接收到原始数据:SEND:Hello World3
接收到原始数据:SEND:Hello World
接收到原始数据:SEND:Hello World5
接收到原始数据:SEND:Hello World6
接收到原始数据:SEND:Hello World4
成功向消息处理中心投递消息:SEND:Hello World1,当前暂存的消息数量是:3
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
成功向消息处理中心投递消息:SEND:Hello World3,当前暂存的消息数量是:3
=======================
成功向消息处理中心投递消息:SEND:Hello World5,当前暂存的消息数量是:3
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
=======================
Disconnected from the target VM, address: '127.0.0.1:62545', transport: 'socket'


版权声明:本文为ls6658原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。