1,新建消息处理中心
package com.smartnest.ability.module.RabbitMq;
import java.util.concurrent.ArrayBlockingQueue;
/**
* @Author LS
* @Description 消息处理中心 Broker
* @Date 17:00 2022/9/21 0021
* @Param
* @return
**/
public class Broker {
// 队列存储消息的最大数量
private final static int MAX_SIZE = 5;
// 保存消息数据的容器
private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);
/**
* @Author LS
* @Description 生产消息
* @Date 17:02 2022/9/21 0021
* @Param
* @return
**/
public static void produce(String msg) {
if (messageQueue.offer(msg)) {
System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
} else {
System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
}
System.out.println("=======================");
}
/**
* @Author LS
* @Description 消费消息
* @Date 17:01 2022/9/21 0021
* @Param
* @return
**/
public static String consume() {
String msg = (String) messageQueue.poll();
if (msg != null) {
// 消费条件满足情况,从消息容器中取出一条消息
System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
} else {
System.out.println("消息处理中心内没有消息可供消费!");
}
System.out.println("=======================");
return msg;
}
}
2,建立消息处理中心服务 BrokerServer
package com.smartnest.ability.module.RabbitMq;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author LS
* @Description 消息处理中心服务 BrokerServer
* @用于启动消息处理中心
* @Date 17:13 2022/9/21 0021
* @Param
* @return
**/
public class BrokerServer implements Runnable {
public static int SERVICE_PORT = 9999;
private final Socket socket;
public BrokerServer(Socket socket) {
this.socket = socket;
}
/**
* @Author LS
* @Description 消息服务
* @Date 17:47 2022/9/21 0021
* @Param []
* @return void
**/
@Override
public void run() {
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
while (true) {
String str = in.readLine();
if (str == null) {
continue;
}
System.out.println("接收到原始数据:" + str);
if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息
//从消息队列中消费一条消息
String message = Broker.consume();
out.println(message);
out.flush();
} else if (str.contains("SEND:")){
//接受到的请求包含SEND:字符串 表示生产消息放到消息队列中
Broker.produce(str);
} else {
System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @Author LS
* @Description 启动服务 建立 链接
* @Date 17:54 2022/9/21 0021
* @Param [args]
* @return void
**/
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(SERVICE_PORT);
while (true) {
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}
3,消费者 - 消费消息 代码
package com.smartnest.ability.module.RabbitMq;
/**
* @Author LS
* @Description 消费者
* @Date 17:16 2022/9/21 0021
* @Param
* @return
**/
public class ConsumeClient {
public static void main(String[] args) throws Exception {
System.out.println("---------------------------消费====消息----------------------------");
for (int i = 0; i < 7; i++) {
String message = MqClient.consume();
System.out.println("消费者 获取的消息 = :" + message);
}
}
}
4,客户端 MqClient
package com.smartnest.ability.module.RabbitMq;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
/**
* @Author LS
* @Description 客户端 MqClient
* @访问消息队列的客户端
* @Date 17:14 2022/9/21 0021
* @Param
* @return
**/
public class MqClient {
//生产消息
public static void produce(String message) throws Exception {
//本地的的BrokerServer.SERVICE_PORT 创建SOCKET
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
try (
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
System.out.println("生产者 发布的消息 = : " + message);
out.println(message);
out.flush();
}
}
//消费消息
public static String consume() throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
//先向消息队列发送命令
out.println("CONSUME");
out.flush();
//再从消息队列获取一条消息
String message = in.readLine();
return message;
}
}
}
5,生产者
package com.smartnest.ability.module.RabbitMq;
/**
* @Author LS
* @Description 生产者
* @Date 17:15 2022/9/21 0021
* @Param
* @return
**/
public class ProduceClient {
public static void main(String[] args) throws Exception {
//SEND: 表示通道, 可以是不同的通道
//生产 消息
MqClient.produce("SEND:Hello World");
MqClient.produce("SEND:Hello World1");
MqClient.produce("SEND:Hello World2");
MqClient.produce("SEND:Hello World3");
MqClient.produce("SEND:Hello World4");
MqClient.produce("SEND:Hello World5");
MqClient.produce("SEND:Hello World6");
}
}
需要先启动:
BrokerServer
然后启动:生成消息服务
ProduceClient
最后测试运行:
ConsumeClient
控制台输出:
Connected to the target VM, address: '127.0.0.1:62545', transport: 'socket'
接收到原始数据:SEND:Hello World1
接收到原始数据:SEND:Hello World2
接收到原始数据:SEND:Hello World3
接收到原始数据:SEND:Hello World
接收到原始数据:SEND:Hello World5
接收到原始数据:SEND:Hello World6
接收到原始数据:SEND:Hello World4
成功向消息处理中心投递消息:SEND:Hello World1,当前暂存的消息数量是:3
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
成功向消息处理中心投递消息:SEND:Hello World3,当前暂存的消息数量是:3
=======================
成功向消息处理中心投递消息:SEND:Hello World5,当前暂存的消息数量是:3
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
=======================
Disconnected from the target VM, address: '127.0.0.1:62545', transport: 'socket'版权声明:本文为ls6658原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。