SpringAMPQ操作RabbitMQ

1. 直接使用com.rabbitmq.client

引入依赖: 下面的依赖中包含了com.rabbitmq.client

  <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

生产者代码:

@Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.1.121");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }

消费者代码:

 public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.1.121");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }

上面大生产者消费者代码可以完成消息的发送和接受,但是操作步骤比较繁琐,因此使用SpringAMPQ来简化操作,下面的代码演示中都需要上面的依赖


2. 简单队列

在这里插入图片描述

生产者代码:

配置application.yml文件

spring:
  rabbitmq:
    host: x.x.x.x# 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
 @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue(){

        String queueName="simple.queue1";
        String msg="hello";
        RabbitAdmin rabbitAdmin=new RabbitAdmin(rabbitTemplate);
        Queue queue=new Queue(queueName);//队列如果不存在则创建队列
        rabbitAdmin.declareQueue(queue);
        rabbitTemplate.convertAndSend(queueName,msg);
    }

消费者代码:

配置队列监听器,代码位于main文件夹中,由spring容器管理

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue1")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

3. WorkQueue

多个消费者绑定到一个队列,共同消费队列中的消息

在这里插入图片描述

生产者代码:

每发送一次休眠20ms是为了避免发送太快导致消息被一个队列消费

 @Test
    public void testWorkQueue() throws InterruptedException {

        String queueName="simple.queue";
        String msg="hello";
        RabbitAdmin rabbitAdmin=new RabbitAdmin(rabbitTemplate);
        Queue queue=new Queue(queueName);//队列如果不存在则创建队列
        rabbitAdmin.declareQueue(queue);
        /*
        发送50条消息
         */
        for(int i=1;i<=50;i++){
            rabbitTemplate.convertAndSend(queueName,msg+i);
            Thread.sleep(20); //1s发送50条消息
        }
    }

消费者代码:

 @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
        Thread.sleep(20);//1s接受50条消息
    }
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:【" + msg + "】");
        Thread.sleep(200);//1s接受5条消息
    }

在这里插入图片描述

上面的消息可以看出来对于消费者1消费的全部都是偶数编号的消息,因为生产者在往队列中每发送一个消息时,消费者都会将消息先取出来放到自己的缓存中,然后再消费,这样会产生一个问题,当某个队列的消息处理能力较差时,它还是取出了队列中的一半的消息(假定只有2个消费者),此时为了减少消息的处理时间可以根据消费者的处理能力来按能力分配消息数量

消费者的application.yml配置

spring:
  rabbitmq:
    host: x.x.x.x # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在这里插入图片描述


4. 发布订阅模式

在这里插入图片描述

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • opic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化

  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力


5. Fanout

在这里插入图片描述

生产者代码:

 @Test
    public void testFanoutExchange(){
        String exchangeName="fanout";//交换机名称
        String msg="hello";
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }

消费者代码:

配置类,声明交换机队列,并绑定二者

@Configuration
public class FanoutConfig {
    /*
    声明交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout");
    }

    /*
    声明队列1
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /*
    将队列1和交换机绑定
    spring会自动根据形参注入前面声明的两个beanfanoutQueue1 fanoutExchange
     */
    @Bean
    public Binding binding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    /*
   声明队列2
    */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /*
    将队列2和交换机绑定
     */
    @Bean
    public Binding binding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

监听器:

 @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String m) {
        System.err.println("消费者2接收到消息:【" + m + "】");
    }

在这里插入图片描述

生产者发送的消息被fanout交换机发送到了两个队列中


6. Direct

Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

在这里插入图片描述

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

生产者代码:

@Test
    public void testDirectExchange(){
        String exchangeName="direct";
        String msg="hello";
        //yellow是routingKey  
        //routingKey为blue时只会队列1
        //routingKey为yellow时只会队列2
        //routingKey为red时会同时发送给队列1队列2
        rabbitTemplate.convertAndSend(exchangeName,"yellow",msg);
    }

消费者代码:

这里代码采用的是注解声明队列和交换机以及绑定关系

 /*
    队列1绑定direct交换机  路由键为red blue
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    /*
    队列2绑定direct交换机  路由键为red yellow
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }

7. Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: china.province

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

china.#:能够匹配china.province 或者 china.province.city

china.*:只能匹配china.province

生产者代码:

 @Test
    public void testSendTopicExchange(){
        String exchangeName="topic";
        String msg="hello";
        //队列1绑定的是china.#  可以匹配china.province以及china.province.city等 china后面跟多个单词
        //队列2绑定的是china.*  可以匹配china.province china后面跟1个单词
         rabbitTemplate.convertAndSend(exchangeName,"china.province.city",msg);
         // rabbitTemplate.convertAndSend(exchangeName,"china.province",msg);
    
    }

消费者代码:

/*
   队列1绑定topic交换机  路由键为china.#
    */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    /*
    队列2绑定topic交换机  路由键为china.*
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
            key = {"china.*"}
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }

8. 消息转化器

ATPQ使用的是默认的jdl的序列化器,当发送字符串时在队列中查看和从队列中获取并没有什么不同,当发送对象或集合类型的就不一样了,比如改一下7中的生产者代码,发送一个map对象

 @Test
    public void testSendTopicExchange(){
        String exchangeName="topic";
        Map<String,Integer> map=new HashMap<>();
        map.put("tom",12);
        String msg="hello";
        //队列1绑定的是china.#  可以匹配china.province以及china.province.city等 china后面跟多个单词
        //队列2绑定的是china.*  可以匹配china.province china后面跟1个单词
        rabbitTemplate.convertAndSend(exchangeName,"china.province.city",map);
    }

此时查看RabbitMQ的管理界面:
在这里插入图片描述

jdk序列化的内容较长而且不易看懂,并且在接收消息时原来使用的形参是String类型,这里需要改成Object类型,为了消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

操作:

导入jackson的依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

然后在代码中添加一个config配置类:

@Configuration
public class MsgConvertConfig {
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

队列中的消息存储形式:

在这里插入图片描述

监听消息的方法签名修改如下:

public void listenTopicQueue2(Map<String,Object> msg){}

在这里插入图片描述


版权声明:本文为qq_43478694原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。