系列文章
RabbitMq系列(七):直接交换Direct exchange
RabbitMq系列(八):扇出交换Fanout Exchange
RabbitMq系列(九):主题交换Topic Exchange
RabbitMq系列(十):标头交换Headers exchange
目录
前言
RabbitMq作为异步通信机制的中间件,大部分情况下,是不需要双方通信的,但是这个时候如果我们需要双方进行更深入的交流呢?例如:在远程服务上执行某个功能过后,我们需要即时地知道结果,然后进行后续操作。
RabbitMq的双方通信中,客户端和服务器,两边都是同样的角色,既是消息生产者又是消息消费者。
RabbitMq构建RPC系统采用解耦的方式,工作流程如下:
- 客户端规定返回消息的队列名称( replyTo)和请求唯一值( correlationId),发送消息的时候携带着 replyTo 和 correlationId,本地也要缓存一份 correlationId。
- 服务器收到客户端路由过来的消息,调用自己的方法对消息进行处理,处理完消息后,将消息路由到 replyTo 这个指定的队列中,消息携带着 correlationId。
- 客户端收到服务器返回的结果,通过对比消息携带的 correlationId 和自己本地缓存的 correlationId 判断是否是自己需要的返回,判断是则进行处理
RabbitMq构建RPC系统实际上是生产者和消费者的角色互换应用。
客户端
客户端主要定义返回消息的队列名称和请求唯一值。
public class MQRPCClient {
public static void main(String[] args) {
try {
pulishAndRecieve("rpc.queue","请MQRPCServer注意!敌军30秒后抵达战场!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 构建消息通道
*/
public static Channel buildChannel() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.239.128");
factory.setPort(5672);
factory.setVirtualHost("mqtest");
factory.setUsername("mqtest");
factory.setPassword("test123");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
return channel;
}
/**
* 发送消息
* @param requestQueue
* @throws IOException
* @throws TimeoutException
*/
public static void pulishAndRecieve(String requestQueue,String msg) throws IOException, TimeoutException {
//构建信道
Channel channel = buildChannel();
//创建排他、自动删除、临时的队列,为了接受返回的消息——reply
String recieveQueue = channel.queueDeclare().getQueue();
//创建请求唯一值,为了确认是哪一个请求的标识——correlationId
String correlationId = UUID.randomUUID().toString();
//携带参数
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(recieveQueue).correlationId(correlationId).build();
//向服务器发送消息
channel.basicPublish("",requestQueue,basicProperties,msg.getBytes());
//等待接受消息
channel.basicConsume(recieveQueue,false,(consumerTag, message) -> {
String recieveCorrelationId = message.getProperties().getCorrelationId();
if(recieveCorrelationId.equals(correlationId))
{
//TODO 消息处理
processMsg(channel,message);
}
},(consumerTag, sig) -> {
try {
channel.basicCancel(consumerTag);
} catch (IOException e) {
e.printStackTrace();
}
});
}
/**
* 处理返回消息
* @param message
*/
public static void processMsg(Channel channel,Delivery message) throws IOException {
String msg = new String(message.getBody());
System.out.println(msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
try {
close(channel);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 关闭连接
* @param channel
*/
public static void close(Channel channel) throws IOException, TimeoutException {
Connection connection = channel.getConnection();
channel.close();
connection.close();
}
}
服务器端
服务器主要是对接收到消息处理过程和结果返回。
public class MQRPCServer {
public static void main(String[] args) {
try {
consumeAndReply("rpc.queue");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 构建消息通道
*/
public static Channel buildChannel(String queueName) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.239.128");
factory.setPort(5672);
factory.setVirtualHost("mqtest");
factory.setUsername("mqtest");
factory.setPassword("test123");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(queueName,false,false,false,null);
//队列中处理的消息数量
channel.basicQos(0,1,false);
return channel;
}
/**
* 消费消息和回复消息
* @param queueName
*/
public static void consumeAndReply(String queueName) throws IOException, TimeoutException {
//构建信道
Channel channel = buildChannel(queueName);
//构建回调函数
DeliverCallback callback = (consumerTag, message) -> {
//处理消息
processMsg(message);
//回复消息处理
replyMsg(channel,message);
};
//消费消息
channel.basicConsume(queueName,false,callback,(consumerTag, sig) -> {
//TODO
});
}
/**
* 处理收到的消息
* @param message
*/
public static void processMsg(Delivery message)
{
String msg = new String(message.getBody());
System.out.println("收到"+message.getProperties().getCorrelationId()+"消息:"+msg);
}
/**
* 回复消息
* @param channel
* @param message
*/
public static void replyMsg(Channel channel,Delivery message) throws IOException {
//获取消息携带的correlationId
String correlationId = message.getProperties().getCorrelationId();
//获取消息携带的replyTo
String replyTo = message.getProperties().getReplyTo();
String msg ="消息收到!全军待命!请"+correlationId+"确认!";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
channel.basicPublish("",replyTo,basicProperties,msg.getBytes());
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
}
测试
- 先启动服务器MQRPCServer,再启动客户端MQRPCClient
- 查看结果

总结
- 客户端要自己定义自己接受的队列 replyTo
- 客户端发送消息要携带 replyTo 和自己的请求唯一值 correlationId
- 为了保证消息的负载均衡,也就是工作队列,请设置 basicQos = 1
- 客户端对返回的消息要自己过滤和筛选