springBoot集成rabbitMq
1.创建虚拟主机
Users可以创建用户,Virtual Hosts创建虚拟主机

2.给用户授权虚拟机

3.创建对列

持久化:如果选durable,则队列消息自动持久化到磁盘上,如果选transient,则不会持久化;
自动删除:默认值no,如果yes,则在消息队列没有使用的情况下,队列自行删除。
4.创建交换机

自动删除:默认值no,如果是yes,则在将所有队列与交换机取消绑定之后,交换机将自动删除。
交换机类型:
- fanout:广播类型
- direct:路由类型
- topic:通配符类型,基于消息的路由键路由
- headers:通配符类型,基于消息的header路由
内部交换器:默认值no,如果是yes,消息无法直接发送到该交换机,必须通过交换机的转发才能到达
次交换机。本交换机只能与交换机绑定。
5.交换机和对列绑定

6.创建父工程
rabbitmqparent
├── rabbitmq-consumer
└── rabbitmq-provider
rabbitmq-provider
pom依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.18.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--rabbit测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
</dependency>
<!--springBoot测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application
server.port=8081
# rabbitMq服务地址
spring.rabbitmq.host=127.0.0.1
# rabbitMq端口号
spring.rabbitmq.port=5672
# rabbitMq虚拟主机地址
spring.rabbitmq.virtual-host=admin
# rabbitMq用户名
spring.rabbitmq.username=guest
# rabbitMq密码
spring.rabbitmq.password=guest
启动类
@SpringBootApplication
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
无法启动
添加web依赖解决

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
rabbitmq-consumer
pom依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.18.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--rabbit测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
</dependency>
<!--springBoot测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
application
server.port=8080
# rabbitMq服务地址
spring.rabbitmq.host=127.0.0.1
# rabbitMq端口号
spring.rabbitmq.port=5672
# rabbitMq虚拟主机地址
spring.rabbitmq.virtual-host=admin
# rabbitMq用户名
spring.rabbitmq.username=guest
# rabbitMq密码
spring.rabbitmq.password=guest
启动类
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
7.五种工作模式
7.1简单模式 Hello World
记得先创建队列

provider生产者发生消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloWorld {
@Autowired
private RabbitTemplate rabbitTemplate;
// 编写生产者发送消息到对列
@Test
public void sendHollWorld() {
/**
* 参数1:对列名称
* 参数2:要发送的消息
*/
rabbitTemplate.convertAndSend("simple_queue", "你好,Hello World");
}
}
consumer消费者接受消息
/**
* 消息监听器
* 消费者,接收消息队列消息监听器
* 必须将当前监听器对象注入Spring的容器中
* queues = "simple_queue" 队列,要与生产者保持一致
*/
@Component
@RabbitListener(queues = "simple_queue")
public class HelloWorldListener {
/**
* 接受消息,业务处理
*
* @param msg
*/
@RabbitHandler
public void simpleHandler(String msg) {
System.out.println("接收到的消息是 : " + msg);
}
}
7.2工作队列模式 Work queues
记得先创建队列
于简单模式相比,增加了消费者,代码基本一样

provider生产者发生消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class WorkQueues {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendWork() {
/**
* 参数1:对列名称
* 参数2:要发送的消息
*/
rabbitTemplate.convertAndSend("work_queue", "我是work模式");
}
}
consumer消费者接受消息01
/**
* 消息监听器
* 消费者,接收消息队列消息监听器
* 必须将当前监听器对象注入Spring的容器中
* queues = "work_queue" 要与发送消息保持一致
*/
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener01 {
/**
* 接受消息,业务处理
*
* @param msg
*/
@RabbitHandler
public void simpleHandler(String msg) {
System.out.println("work监听消息01 : " + msg);
}
}
consumer消费者接受消息02
/**
* 消息监听器
* 消费者,接收消息队列消息监听器
* 必须将当前监听器对象注入Spring的容器中
* queues = "work_queue" 要与发送消息保持一致
*/
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener02 {
/**
* 接受消息,业务处理
*
* @param msg
*/
@RabbitHandler
public void simpleHandler(String msg) {
System.out.println("work监听消息02 : " + msg);
}
}
7.3Exchange常见类型
Exchange有常见以下3种类型:
- Fanout:广播 将消息交给所有绑定到交换机的队列, 不处理路由键。只需要简单的将队列绑定到
交换机上。fanout 类型交换机转发消息是最快的。 - Direct:定向 把消息交给符合指定routing key 的队列. 处理路由键。需要将一个队列绑定到交换
机上,要求该消息与一个特定的路由键完全匹配。如果一个队列绑定到该交换机上要求路由键
“dog”,则只有被标记为 “dog” 的消息才被转发,不会转发 dog.puppy,也不会转发 dog.guard,
只会转发dog。
其中,路由模式使用的是 direct 类型的交换机。 - Topic:主题(通配符) 把消息交给符合routing pattern(路由模式)的队列. 将路由键和某模式进
行匹配。此时队列需要绑定要一个模式上。符号 “#” 匹配一个或多个词,符号""匹配不多不少一个
词。因此“audit.#” 能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到 “audit.irs”。
其中,主题模式(通配符模式)使用的是 topic 类型的交换机。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑
定,或者没有符合路由规则的队列,那么消息会丢失
7.4Publish/Subscribe发布与订阅模式
简单理解,我发送100条消息,交换机收到100条,他会给队列一100条消息,会给队列二100条消息

发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的
队列都将接收到消息
【广播消息:一次性将消息发送给所有消费者,每个消费者收到消息均一致】
创建2个队列

创建交换机并绑定队列

provider生产者发生消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class publishExchange {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendPublish() {
/**
* 参数一:交换机
* 参数二:路由键(空)
* 参数三:发送的消息
*/
rabbitTemplate.convertAndSend("publish_exchange", "", "我是发布订阅模式");
}
}
consumer消费者接受消息01
@Component
@RabbitListener(queues = "publish_queue1")
public class PublishListener01 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("publish 01: " + msg);
}
}
consumer消费者接受消息02
@Component
@RabbitListener(queues = "publish_queue2")
public class PublishListener02 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("publish 02: " + msg);
}
}
7.5路由模式 Routing
简单理解:发布与订阅模式是将消息分别全部发送到队列,而路由模式是在交换机里配置路由键,生产者可以指定发送到那个路由键里,路由键有和对列一一对应,这样就会进入相应的对列

创建2个队列

创建交换机并绑定队列

provider生产者发生消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class RoutingExchange {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendRouting() {
/**
* 参数一:交换机
* 参数二:路由键
* 参数三:发送的消息
*/
for (int i = 0; i < 100; i++) {
if (i/2==0){
rabbitTemplate.convertAndSend("routing_exchange", "info", "我是发布订阅模式"+i+"info");
}else {
rabbitTemplate.convertAndSend("routing_exchange", "error", "我是发布订阅模式"+i+"error");
}
}
}
}
consumer消费者接受消息01
@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener01 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("routing_queue1: " + msg);
}
}
consumer消费者接受消息02
@Component
@RabbitListener(queues = "routing_queue2")
public class RoutingListener02 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("routing_queue2: " + msg);
}
}
7.6主题模式(Topics通配符模式)

面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。
符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
举例:
item.#: 能够匹配item.insert.abc.bbc 或者item.insert
item.*:只能匹配item.insert
创建2个队列

创建交换机并绑定队列

provider生产者发生消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class TopicsExchange {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendTopics() {
/**
* 参数一:交换机
* 参数二:路由键
* 参数三:发送的消息
*/
rabbitTemplate.convertAndSend("topics_exchange","item.insert","这条item.#和item.*都会有");
rabbitTemplate.convertAndSend("topics_exchange","item.insert.abc","这条item.#有");
}
}
consumer消费者接受消息01
@Component
@RabbitListener(queues = "topics_queu1")
public class TopicsListener01 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("topics_queue1: " + msg);
}
}
consumer消费者接受消息02
@Component
@RabbitListener(queues = "topics_queu2")
public class TopicsListener01 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("topics_queue2: " + msg);
}
}
7.7总结
工作模式:
- 1、简单模式 HelloWorld : 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
- 2、工作队列模式 Work Queue: 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默
认的交换机) - 3、发布订阅模式 Publish/subscribe: 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,
当发送消息到交换机后,交换机会将消息广播发送到绑定的队列 - 4、路由模式 Routing: 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing
key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列 - 5、通配符模式 Topic: 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式
的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
8.高级篇
生产者确认模式
在生产者(发送消息放)
application.properties
# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms=true
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns=true
confirm(config配置)
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
* * 设置消息确认回调方法
* * 设置消息回退回调方法
*
*/
@PostConstruct
public void initRabbitTemplate() {
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this::confirm);
rabbitTemplate.setReturnCallback(this::returnedMessage);
}
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息进入交换机成功");
} else {
System.out.println("消息进入交换机失败, 失败原因:" + cause);
}
}
/**
* 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法
* @param message 投递消息内容
* @param replyCode 返回错误状态码
* @param replyText 返回错误内容
* @param exchange 交换机名称
* @param routingKey 路由键
*
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("交换机:" + exchange);
System.out.println("路由键:" + routingKey);
System.out.println("错误状态码:" + replyCode);
System.out.println("错误原因:" + replyText);
System.out.println("发送消息内容:" + message.toString());
System.out.println("<<<<<<<<");
}
}
消费者确认签收
消息确认的三种类型:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦)
application.properties
# 配置开启手动签收
# 简单模式的开启手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 路由模式开启手动签收
spring.rabbitmq.listener.direct.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.direct.retry.enabled=true
消费者接受消息
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener01 {
@RabbitHandler
public void simpleHandler(String msg, Message message, Channel channel) throws IOException {
System.out.println("routing_queue1: " + msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
// 模拟异常
// if (msg.contains("苹果")) {
// throw new RuntimeException("不允许卖苹果手机!!!");
//}
/**
* 手动签收消息
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag, false);
System.out.println("手动签收完成:{}");
} catch (Exception ex) {
/**
* 手动拒绝签收
* 参数1:当前消息的投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
* 参数3:是否重回队列,true为重回队列,false为不重回
*/
channel.basicNack(deliveryTag, false, true);
System.out.println("拒绝签收,重回队列:{}" + ex);
}
}
}