(转载)消息队列之 RabbitMQ(二)

原文地址:https://www.cnblogs.com/dongkuo/p/6001791.html

Tutorials

在学习 6 中模型之前,我们首先需要安装 RabbitMQ。RabbitMQ 支持多种系统平台,各平台的安装方法可以 http://www.rabbitmq.com/download.html 查看。安装好之后,我们使用如下命令启用 Web 端的管理插件:

rabbitmq-plugins enable rabbitmq_management,然后启动RabbitMQ。接着用浏览器访问http://localhost:15672/,若能看到RabbitMQ相关Web页面,说明启动成功。

Hello World

我们先从最简单的 Hello World 开始。首先当然是新建一个项目,导入 RabbitMQ 相关 jar ,我采用 Maven 来构建项目,因此只需要在 pom 文件汇总添加如下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

接下来学习最简单的消息队列模型,如下图:

在这里插入图片描述

在图中,P代表producer,它是消息的生产者;C代表consumer,它是消息的消费者;而红色的矩形正是我们所谓的消息队列,它位于RabbitMQ中(RabbitMQ中可以有很多这样的队列,并且每个队列都有一个唯一的名字)。生产者(们)可以将消息发送到消息队列中,消费者(们)可以从消息队列中取出消息。

这种模型是不是很简单呢?下面我们使用 Java ,借助于 RabbitMQ 来实现这种模型的消息通信。

首先我们介绍如何 send 消息到消息队列。send 之前,当然是和 RabbitMQ 服务器建立连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

接下来我们创建一个 channel ,大多数 API 都是通过这个对象来调用的:

Channel channel = connection.createChannel();

之后,我们便可以建一个 channel,大多数 API 都是通过这个对象来调用的:

channel.queueDeclare("hello", false, false, false, null);

该方法的第一个参数是队列的名称,其余的参数先不管,之后会介绍。我们可以尝试着去执行以上的 5行代码,然后打开 Web 端,可以看到新建了一个叫做 hello 的队列:

在这里插入图片描述

有了队列,我们便可以向其中发送消息了,同样还是调用 channel 对象的 API:

channel.basicPublish("", "hello", null, "Hello World".getBytes());

以上代码所做的事情就是发送了一条字符串消息 “Hello World”(第 4 个参数)到消息队列。你可能注意到 我们调用了 string 对象的 getBytes 方法,没错,饿哦们发送的实际上是二进制数据。因此,理论上你能够发送任何数据到消息队列中,而不仅仅是文本信息。

第 2 个参数叫做路由键(routing key),在该模型下必须与队列名相同,至于为什么,和其他参数一样,之后会了解到。

我们可以修改发送的文本,再次执行上述代码,然后打开 Web 端查看,便可以查看到我们发送的消息:

在这里插入图片描述

点击上图的 name 字段下的 hello,可以查看 hello 队列中的具体信息:

在这里插入图片描述
接下来,我们去尝试着去获取 生产者发送的消息,和 send 方法一样,我们同样需要连接服务器创建 channel ,声明队列:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);

之后我们可以调用 channel 的相关方法去监听队列,接收消息:

channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }
});

以上 basicConsume 方法中,第一个参数是队列的名字;第二个参数表示是否自动确认消息的接收情况,我们使用 true,自动确认;第三个参数需要传入一个实现了 Consumer 接口的对象,我们简单的 new 一个默认的 Consumer 的实现类 DefaultComsumer,然后在 handleDelivery 方法中去处理接收到的消息(handleDelivery 方法会在接收到消息时被回调)。

运行以上代码,我们可以打印出之前向队列中 send 的数据:

Hello World
Hello World2

下面是 Hello World 的完整代码:

public class App {

    @Test
    public void send() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);

        channel.basicPublish("", "hello", null, "Hello World2".getBytes());

        channel.close();
        connection.close();
    }

    @Test
    public void receive() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);

        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body, "UTF-8"));
            }

        });
        synchronized (this){
            // 因为以上接收消息的方法是异步的(非阻塞),当采用单元测试方式执行该方法时,程序会在打印消息前结束,因此使用wait来防止程序提前终止。若使用main方法执行,则不需要担心该问题。
            wait();
        }
    }
}

Work queues

接下来我们学习第二种模型——Work Queues 。顾名思义,这种模型描述的是一个生产者(Boss)向队列发消息(任务),多个消费者(worker)从队列接受消息(任务),如下图所示:

在这里插入图片描述

下面我们用代码去实现。先是生产者 send 消息到队列,这次我们多发送些数据:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

    for (int i = 0; i < 9; i++) {
        channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());
    }

    channel.close();
    connection.close();
}

然后是消费者接收数据:

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
            try {
            //  Thread.sleep(1000);
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    synchronized (this) {
        wait();
    }
}

代码基本上和 Hello World 的代码一样,只是加上句 sleep 来模拟消费者(worker)处理消息所花的时间。

我们可以先执行三次 receive 方法(修改 sleep 的时间,其中消费者1 sleep 10s,消费者2,3 sleep 1s),让消费者(worker)一起等待消息的到来,然后执行 send 方法发送 9 条消息,观察三个消费者收到的消息情况。

若不出意外,你会看到如下的打印结果:

// --------消费者1--------
0
// 10s 后
3
// 10s 后
6
// --------消费者2--------
1
// 1s 后
4
// 1s 后
7
// --------消费者3--------
2
// 1s 后
5
// 1s 后
8

通过打印结果,我么可以总结出 Work queues 的几个特点:

  1. 一条消息只会被一个消费者接收;
  2. 消息是平均分配给消费者的;
  3. 消费者只有在处理完某条消息后,才会收到下一条消息。

事实上,RabbitMQ 会循环地(一个接一个地)发送消息给消费者,这种分配消息的方式别称为 round-robin(轮询)。

消息确认

看到这里,不知你是否会担心:由于 worker (消费者)执行任务需要一定的时间(以上用 sleep 模拟),要是某个 worker 在运行过程中挂掉,那分配给它的任务岂不是丢失了(永远不可能被执行了)。为解决这个问题,RabbitMQ 提供了消息确认机制,即 worker 需要主动的去确认消息已经接收了,RabbitMQ 才认为消息被“真正地接收”了,实现代码如下:

// send的代码不用变,只需改变basicConsume的第二个参数为false,表示不要自动确认
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
        try {
            // 这里把时间加长了一点便于测试
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 这里手动地确定
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
});

下面做测试,首先执行 send 方法,发送 9 条消息到队列,查看 Web 端情况如下:

在这里插入图片描述

此时队列中有 9 条未被分发的消息。接着运行改变后的 receive 方法,然后快速地去 Web 端查看队列中的消息情况 (记得刷新):

在这里插入图片描述

发现队列中 没有待分发 (Ready 字段)的消息了,而有 9 条未被确认 (Unacked 字段)的消息。但控制台打印出数字 6 时,关闭程序看,再次去 Web 端查看:

在这里插入图片描述

此时队列中又有 3条待分发的消息了。原因正是由于我们提前终止了 receive 方法的执行,导致最后 3条消息没有被确认而被重新归还到 Ready 中。

消息持久化

如果你不是一次性跟着本文运行代码到这里,而是第二天接着昨天的工作继续进行,你可能会发现昨天你创建的队列和添加到队列里的消息没有了。很可能的原因就是消息没有持久化,即按照有上代码运行生成的队列和添加到队列中的消息都是储存在内存中的,RabbitMQ 一旦关闭它们就没有了。如果你想将下次启动时还能看到关闭前的消息,你应该将其持久化:

// 将第二个参数设为true,表示声明一个需要持久化的队列。
// 需要注意的是,若你已经定义了一个非持久的,同名字的队列,要么将其先删除(不然会报错),要么换一个名字。
channel.queueDeclare("hello", true, false, false, null);
// 修改了第三个参数,这是表明消息需要持久化
channel.basicPublish("", "hello",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

总的来说,Work queues(Task queues)的概念在一些 Web 场景的应用中是很有用的,比如我们能够用它来构建一个 master-slave 结构的分布式爬虫系统:系统中有一个 master 节点和多个 slave 节点, master 节点负责向各个 slave 节点分配爬取任务。

Publish / Subscribe

但有些时候,我们可能希望一条消息能够被多个消费者接收到,比如一些公告信息等,这时候用 Work Queue 模型显然不合适,而 Publish / Subscribe 模型正式对应这种使用场景的。

在介绍 Publish / Subscribe 之前,我们快速地回顾之前的两个模型,它们好像都是生产者将消息直接发送到消息队列,但其实不是这样的,甚至有可能生产者根本就不知道消息发送到了哪一个消息队列。

先别着急,下面我们完整地介绍 RabbitMQ 消息发送 / 接受 的方式。

事实上,生产者是把消息发送到了交换机 (exchange)中,然后交换机负责(决定)将消息发送到 (哪一个)消息队列中,其模型如下图:

在这里插入图片描述

这时候你可能会疑惑:既然消息被发送到了交换机中,那我们之前发送的消息是被消息是被到送到了哪一个交换机中了?它有没有机制能够让特定的消息发送到指定的队列?

先回答第一个问题。还记得我们在 Hello World 中写的发送消息的代码吗?

channel.basicPublish("", "hello", null, message.getBytes());

事实上第一个 参数便是指定 交换机的名字,即指定消息被发送到哪一个交换机。空字符串表示默认交换机(Default Exchange),即我们之前发送的消息都是先发送到 默认交换机,然后它再路由到相应的队列中,其实我们可以通过 Web 页面去查看所有存在的交换机:

在这里插入图片描述

接着回答第二个问题。路由的依据便是通过 第二个参数 —— 路由键(routing key)指定的,之前已经提到过。在之前代码中,我们指定第二个参数为“hello”,便是指定消息应该被交换机路由到路由键为 hello 的队列中,而默认交换机(Default Exchange)有一个非常有用的性质:

每一个被创建的队列都会被自动的绑定到默认交换机上,并且路由键就是队列的名字。

交换机还有 4 中不同的类型,分别是 direct, fanout,topic,headers,每种类型路由的策略不同。

direct 类型的及哦啊换季要求和它绑定的队列带有一个路由键 K,若有一个带有路由键 R 的消息到达了交换机,交换机会将此消息路由到路由键 K = R 的队列。默认交换机便是该类型。因此,在下图中,消息会沿着绿色箭头路由:

在这里插入图片描述

fanout 类型的交换机会路由每一条消息到所有和它绑定的队列,忽略路由键。

剩下的两种类型之后再做介绍。

在以上概念基础上,我们来看第 3 种消息模型:Publish / Subscribe 。 如下图:

在这里插入图片描述

该模型是要让所有的消费者都能够接收到每一条消息。显然, fanout 类型的交换机更符合我们当前的需求。为此,先创建一个 fanout 类型的交换机。

channel.exchangeDeclare("notice", "fanout");

其中,第一个参数是交换机的名称;第二个参数是交换机的类型。

然后我们可以 send 消息了:

channel.basicPublish( "notice", "", null, message.getBytes());

对于消费者,我们需要为每一个消费者创建一个独立的队列,然后将队列绑定到刚才指定的交换机上即可:

// 该方法会创建一个名称随机的临时队列
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到指定的交换机("notice")上
channel.queueBind(queueName, "notice", "");

以下完整的代码:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice", "fanout");
    channel.basicPublish( "notice", "", null, "Hello China".getBytes());
    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "notice", "");

    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }

    });
    synchronized (this) {
        wait();
    }
}

首先运行两次 receive 方法,让两个消费者等待接收消息,然后可以在 Web 端查看此时的队列情况,如下图所示:

在这里插入图片描述

可以看到图中有两个名称随机的队列。接着运行 send 方法发送一条消息,最终我们会看到两个消费者都打印出了 Hello China 。然后停止虚拟机让消费者断开连接,再次在 Web 端查看队列情况,会发现刚才的两个队列被自动删除了。

Routing

以上是 Publish / Subscribe 模式,它已经能让我们的通知(notice)系统正常运转了,现在再考虑这样一个新需求:对于一些机密通知,我们只想让部分人看到。这就要求交换机对绑定在其上的队列进行筛选,于是引出了有一个新的模型 Routing。

之前我们说过,对于 direct 类型的交换机,它会根据 routing key 进行路由,因此我们可以借助它来实现我们的需求,模型结构如下图:

在这里插入图片描述

下面用代码来实现。先看生产者。

首先要声明一个 direct 类型的交换机:

// 这里名称改为notice2
channel.exchangeDeclare("notice2", "direct");

需要注意的是,因为我们之前声明了一个 fanout 类型的名叫 notice 的交换机。因此不能再声明一个同名的类型却不一样的交换机。

然后可以发送消息了,我们发送 10 条 消息,其中偶数条消息是秘密消息,只能被 routing key 为 s 的队列接收,其余的消息所有的队列都能接收。

for (int i = 0; i < 10; i++) {
            String routingKey = "n"; // normal
            if (i % 2 == 0) {
                routingKey = "s"; // secret
            }
            channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
        }

接下来是消费者:

// 声明一个名称随机的临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机,同时带上routing key
channel.queueBind(queueName, "notice2", "n");
// 消费者2号运行时,打开以下注释
// channel.queueBind(queueName, "notice2", "s");

注意,我们可以多次调用队列绑定方法,调用时,队列名和交换机名都相同,而 routing key 不同,这样可以是一个队列带有多个 routing key。

以下是完整代码:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("notice2", "direct");
    for (int i = 0; i < 10; i++) {
        String routingKey = "n"; // normal
        if (i % 2 == 0) {
            routingKey = "s"; // secret
        }
        channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
    }
    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice2", "direct");

    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "notice2", "n");
    // channel.queueBind(queueName, "notice2", "s");

    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }
    });
    synchronized (this) {
        wait();
    }
}

测试时,我们可以先运行一个 receive ,然后打开 channel.queueBind(queueName,“notice2”,“s”)注释,再 运行一次 运行一次 receive ,这样就有两个消费者绑定到 notice2 交换机上,其中消费者2只能收到 normal 类型的消息,而消费者2既能收到 normal 类型的消息,又能收到 secret 类型的消息。接着可以运行 send 方法。如不出意外,可以看到如下打印结果:

// 消费者1
1
3
5
7
9
// 消费者2
0
1
2
3
4
5
6
7
8
9

Topic

有了以上的改进,我们的 notice 系统基本 ok 了,但有些时候,我们好需要更加灵活的消息刷选方式。比如我们对于电影信息,我们可能需要对它的地区,类型,限制级进行筛选。这时候就要借助 Topic 模型了。

在 Topic 模型中,我们 升级 了routing key,它可以由多个关键词组成,词与词之间由点号(·)隔开。特别地,规定 * 表示任意的一个词;* 号表示任意的 0个或多个词。

加深我们现在需要接收电影信息,每条电影消息附带的 routing key 有地区、类型、限制级 3个关键字,即:district.type
.age 。现在想实现的功能如下图:

在这里插入图片描述

如上图所示,队列 Q1 只关心美国适合 13 岁以上的电影信息,队列 Q2 对动作片感兴趣,而队列 Q3 喜欢中国电影。

下面用 Java 代码去实现上述功能,相较于之前基本上没有什么改变,下面直接给出代码:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("movie", "topic");

    channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());
    channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());
    channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());

    channel.basicPublish("movie", "Chinese.action.13", null, "卧虎藏龙".getBytes());
    channel.basicPublish("movie", "Chinese.comedy.13", null, "大话西游".getBytes());
    channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯与祝英台".getBytes());

    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("movie", "topic");
    // 队列1
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "movie", "American.*.13");
    // 队列2
//        String queueName = channel.queueDeclare().getQueue();
//        channel.queueBind(queueName, "movie", "*.action.*");
    // 队列3
//        String queueName = channel.queueDeclare().getQueue();
//        channel.queueBind(queueName, "movie", "Chinese.#");


    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }

    });
    synchronized (this) {
        wait();
    }
}

运行 3 次 receive 方法,注意打开或关闭相应的注释;再运行 send 方法,可以看到控制台输出如下内容:

// 消费者1
The Bourne Ultimatum
Titanic
// 消费者2
The Bourne Ultimatum
卧虎藏龙
// 消费者3
卧虎藏龙
大话西游
梁山伯与祝英台

RPC

第 6 种模型是用来做 RPC (Remote procedure call,远程程序调用)的。这里直接贴上代码,就不做解释了,想要了解更多细节,可参考 http://www.rabbitmq.com/tutorials/tutorial-six-java.html 。代码演示的是,客户端调用服务端的 fib 方法,得到返回结果。

RPCServer.java


import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Description:
 *
 * @author derker
 * @Time 2016-10-26 18:24
 */
public class RPCServer {


    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");
            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                AMQP.BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties
                        .Builder()
                        .correlationId(props.getCorrelationId())
                        .build();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [.] fib(" + message + ")");
                    response = "" + fib(n);
                } catch (Exception e) {
                    System.out.println(" [.] " + e.toString());
                    response = "";
                } finally {
                    channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}

RPCClient.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

/**
 * Description:
 *
 * @author derker
 * @Time 2016-10-26 18:36
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = UUID.randomUUID().toString();

        BasicProperties props = new BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody(), "UTF-8");
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }

    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(10)");
            response = fibonacciRpc.call("10");
            System.out.println(" [.] Got '" + response + "'");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    fibonacciRpc.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}