rabbitmq 多个消费者消费一个队列_Spring整合RabbitMQ详解!(生产者工程、消费者工程--代码解析)...

8bfb932c5e8a800db60d53e11e16d6fc.png

1. Spring Boot整合RabbitMQ

1.1. 简介

在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发

送消息,使用注解接收消息。

一般在开发过程中:

生产者工程

1. application.yml文件配置RabbitMQ相关信息;

2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定

3. 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机

消费者工程

1. application.yml文件配置RabbitMQ相关信息

2. 创建消息处理类,用于接收队列中的消息并进行处理

1.2. 搭建生产者工程

1.2.1. 创建工程

创建生产者工程springboot-rabbitmq-producer

2145ed46da04150a6318a62a38d7d063.png

61adc6ecb20874b52cfd2be251f5d814.png

1.2.2. 添加依赖

修改pom.xml文件内容为如下:

```

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.1.4.RELEASE</version>

</parent>

<groupId>com.itheima</groupId>

<artifactId>springboot-rabbitmq-producer</artifactId>

<version>1.0-SNAPSHOT</version>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId> </dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

</dependencies>

</project>

```

```

package com.itheima.rabbitmq;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication public class ProducerApplication {

public static void main(String[] args) {

SpringApplication.run(ProducerApplication.class);

}

}

```

### 1.2.4. 配置RabbitMQ

#### 1)配置文件

创建application.yml,内容如下:

```

spring:

rabbitmq:

host: localhost

port: 5672

virtual-host: /itcast

username: heima

password: heima

```

2)绑定交换机和队列

创建RabbitMQ队列与交换机绑定的配置类com.itheima.rabbitmq.config.RabbitMQConfig

```

package com.itheima.rabbitmq.config;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration public class RabbitMQConfig {

//交换机名称

public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";

//队列名称

public static final String ITEM_QUEUE = "item_queue";

//声明交换机

@Bean("itemTopicExchange")

public Exchange topicExchange(){

return

ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();

}

//声明队列

@Bean("itemQueue")

public Queue itemQueue(){

return QueueBuilder.durable(ITEM_QUEUE).build();

}

//绑定队列和交换机

@Bean

public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,

@Qualifier("itemTopicExchange") Exchange

exchange){

return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();

}

}

```

1.2.5 消息发送Controller

我们创建一个SpringMVC的Controller方便我们进行测试

```

package com.itheima.rabbitmq.controller;

import com.itheima.rabbitmq.config.RabbitMQConfig;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;

/***

发送消息的测试类

*/

@RestController

public class SendMsgController {

//注入RabbitMQ的模板

@Autowired

private RabbitTemplate rabbitTemplate;

/***

测试

*/ @GetMapping("/sendmsg")

public String sendMsg(@RequestParam String msg, @RequestParam String key){

/**

* 发送消息

* 参数一:交换机名称

* 参数二:路由key

* 参数三:发送的消息

*/

rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE ,key ,msg);

//返回消息

return "发送消息成功!";

}

}

```

1.3. 搭建消费者工程

1.3.1. 创建工程

创建消费者工程springboot-rabbitmq-consumer

8f7ad78ec1bef4a41364d2bd11d15246.png

3c498ff0dcacd288a5857516798066a6.png

1.3.2. 添加依赖

修改pom.xml文件内容为如下:

```

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version>

</parent>

<groupId>com.itheima</groupId>

<artifactId>springboot-rabbitmq-consumer</artifactId>

<version>1.0-SNAPSHOT</version>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

</dependencies>

```

1.3.3. 启动类

1.3.4. 配置RabbitMQ

创建application.yml,内容如下:

```

spring:

rabbitmq:

host: localhost

port: 5672

virtual-host: /itcast

username: heima

password: heima

```

编写消息监听器com.itheima.rabbitmq.listener.MyListener

```

package com.itheima.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class MyListener { /*** 监听某个队列的消息 *

@param message

接收到的消息 */

@RabbitListener(queues = "item_queue")

public void myListener1(String message){

System.out.println("消费者接收到的消息为:" + message); } }

```

1.3.6. 测试

2e7533165578b650ec5dd8db73bc0c11.png

在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:

```

package com.itheima.rabbitmq;

import com.itheima.rabbitmq.config.RabbitMQConfig;

import org.junit.Test; import org.junit.runner.RunWith;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)

@SpringBootTest

public class RabbitMQTest {

@Autowired

private RabbitTemplate rabbitTemplate;

@Test public void test(){

rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,

"item.insert", "商品新增,routing key 为item.insert"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,

"item.update", "商品修改,routing key 为item.update"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,

"item.delete", "商品删除,routing key 为item.delete");

}

}

```

先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程

springboot-rabbitmq-consumer中控制台查看是否接收到对应消息。

另外;也可以在RabbitMQ的管理控制台中查看到交换机与队列的绑定:

bd02243064f62e7d5d5b167a966e655e.png

版权声明:本文为weixin_39828859原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。