RabbitMq系列(十一):远程调用RPC

系列文章

RabbitMq系列(一):服务器搭建

RabbitMq系列(二):最简单的例子

RabbitMq系列(三):工作队列

RabbitMq系列(四):消息确认和持久性

RabbitMq系列(五):公平派遣

RabbitMq系列(六):交换类型以及例子

RabbitMq系列(七):直接交换Direct exchange

RabbitMq系列(八):扇出交换Fanout Exchange

RabbitMq系列(九):主题交换Topic Exchange

RabbitMq系列(十):标头交换Headers exchange

RabbitMq系列(十一):远程调用RPC

 

目录

前言

客户端

服务器端

测试

总结


前言

RabbitMq作为异步通信机制的中间件,大部分情况下,是不需要双方通信的,但是这个时候如果我们需要双方进行更深入的交流呢?例如:在远程服务上执行某个功能过后,我们需要即时地知道结果,然后进行后续操作。

RabbitMq的双方通信中,客户端和服务器,两边都是同样的角色,既是消息生产者又是消息消费者。

RabbitMq构建RPC系统采用解耦的方式,工作流程如下:

  • 客户端规定返回消息的队列名称( replyTo)和请求唯一值( correlationId),发送消息的时候携带着 replyTo 和 correlationId,本地也要缓存一份 correlationId
  • 服务器收到客户端路由过来的消息,调用自己的方法对消息进行处理,处理完消息后,将消息路由到 replyTo 这个指定的队列中,消息携带着 correlationId。
  • 客户端收到服务器返回的结果,通过对比消息携带的 correlationId 和自己本地缓存的  correlationId 判断是否是自己需要的返回,判断是则进行处理

RabbitMq构建RPC系统实际上是生产者和消费者的角色互换应用。

客户端

客户端主要定义返回消息的队列名称和请求唯一值。

public class MQRPCClient {

    public static void main(String[] args) {

        try {
            pulishAndRecieve("rpc.queue","请MQRPCServer注意!敌军30秒后抵达战场!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 构建消息通道
     */
    public static  Channel buildChannel() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.239.128");
        factory.setPort(5672);
        factory.setVirtualHost("mqtest");
        factory.setUsername("mqtest");
        factory.setPassword("test123");
        //创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        return channel;
    }

    /**
     * 发送消息
     * @param requestQueue
     * @throws IOException
     * @throws TimeoutException
     */
    public static void  pulishAndRecieve(String requestQueue,String msg) throws IOException, TimeoutException {
        //构建信道
        Channel channel = buildChannel();
        //创建排他、自动删除、临时的队列,为了接受返回的消息——reply
        String recieveQueue = channel.queueDeclare().getQueue();
        //创建请求唯一值,为了确认是哪一个请求的标识——correlationId
        String correlationId = UUID.randomUUID().toString();
        //携带参数
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(recieveQueue).correlationId(correlationId).build();
        //向服务器发送消息
        channel.basicPublish("",requestQueue,basicProperties,msg.getBytes());
        //等待接受消息
        channel.basicConsume(recieveQueue,false,(consumerTag, message) -> {
            String recieveCorrelationId = message.getProperties().getCorrelationId();
            if(recieveCorrelationId.equals(correlationId))
            {
                //TODO 消息处理
                processMsg(channel,message);
            }
        },(consumerTag, sig) -> {
            try {
                channel.basicCancel(consumerTag);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 处理返回消息
     * @param message
     */
    public static void processMsg(Channel channel,Delivery message) throws IOException {
        String msg = new String(message.getBody());
        System.out.println(msg);
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        try {
            close(channel);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭连接
     * @param channel
     */
    public static void close(Channel channel) throws IOException, TimeoutException {
        Connection connection = channel.getConnection();
        channel.close();
        connection.close();
    }
}

服务器端

服务器主要是对接收到消息处理过程和结果返回。

public class MQRPCServer {

    public static void main(String[] args) {
        try {
            consumeAndReply("rpc.queue");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 构建消息通道
     */
    public static Channel buildChannel(String queueName) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.239.128");
        factory.setPort(5672);
        factory.setVirtualHost("mqtest");
        factory.setUsername("mqtest");
        factory.setPassword("test123");
        //创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(queueName,false,false,false,null);
        //队列中处理的消息数量
        channel.basicQos(0,1,false);
        return channel;
    }

    /**
     * 消费消息和回复消息
     * @param queueName
     */
    public  static void consumeAndReply(String queueName) throws IOException, TimeoutException {
        //构建信道
        Channel channel = buildChannel(queueName);
        //构建回调函数
        DeliverCallback callback = (consumerTag, message) -> {
            //处理消息
            processMsg(message);
            //回复消息处理
            replyMsg(channel,message);
        };
        //消费消息
        channel.basicConsume(queueName,false,callback,(consumerTag, sig) -> {
            //TODO
        });
    }


    /**
     * 处理收到的消息
     * @param message
     */
    public static void processMsg(Delivery message)
    {
        String msg = new String(message.getBody());
        System.out.println("收到"+message.getProperties().getCorrelationId()+"消息:"+msg);
    }


    /**
     * 回复消息
     * @param channel
     * @param message
     */
    public static void replyMsg(Channel channel,Delivery message) throws IOException {

        //获取消息携带的correlationId
        String correlationId = message.getProperties().getCorrelationId();
        //获取消息携带的replyTo
        String replyTo = message.getProperties().getReplyTo();
        String msg ="消息收到!全军待命!请"+correlationId+"确认!";
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
        channel.basicPublish("",replyTo,basicProperties,msg.getBytes());
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    }
}

测试

  • 先启动服务器MQRPCServer,再启动客户端MQRPCClient
  • 查看结果

总结

  1. 客户端要自己定义自己接受的队列 replyTo
  2. 客户端发送消息要携带 replyTo 和自己的请求唯一值 correlationId 
  3. 为了保证消息的负载均衡,也就是工作队列,请设置 basicQos = 1
  4. 客户端对返回的消息要自己过滤和筛选