RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
一、消息队列
1、MQ:Message Queue释义
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)
2、消息队列特点
- 解耦:使用了消息队列后,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦
- 提速:发收消息速度增加
- 广播:只要发一次消息
- 错峰与流控:流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
二、RabbitMQ的结构
1、消息队列协议-----AMQP
一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议
消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.
2、技术选型

3、RabbitMQ

4、 结构初识

- Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
- Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有ExchangeQueue.
- Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
- ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
- Message Queue:消息队列,用于存储还未被消费者消费的消息.
- Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内容.
- BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.
三、Docker中部署RabbitMQ
1、拉取镜像
docker pull rabbitmq:management
*注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面

2、 运行
docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名
RABBITMQ_DEFAULT_USER:默认的用户名
RABBITMQ_DEFAULT_PASS:默认用户名的密码

3、springboot连接配置
- 配置账号



*切记需要授权
四、搭建rabbitmq项目
1、需要建立生产者和消费者
生产者和消费者需要继承父类

2、父类导入amqp依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、yml文件配置
server:
port: 8080
spring:
application:
name: xx
rabbitmq:
host: 192.168.44.138
password: 123456
port: 5672
username: springboot
virtual-host: my_vhost
4、生产者 Provider
package com.xhy.provider; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { return new Queue("firstQueue"); } }
![]()
在测试类里启动,就会新建一个firstQueue:
5.创建Sender类
模拟队列中的发送消息
package com.xhy.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all") public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendFirst() {
rabbitTemplate.convertAndSend("firstQueue", "Hello World");
}
}
6、在测试类调用senderFirst方法
模拟消费者下单后给生产者发送消息
package com.xhy.provider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.sendFirst();
}
}
7.此时RabbitMQ内出现一条消息队列

8. 在consumer的yml文件中配置相关信息
server:
port: 8081
spring:
application:
name: xx
rabbitmq:
host: 192.168.44.138
password: 123456
port: 5672
username: springboot
virtual-host: my_vhost
注意:端口号与provider不一致
9.在consumer中创建Receiver类
模拟生产者接收消息
@RabbitListener表示监听消息队列
package com.xhy.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue") public class Receiver {
@RabbitHandler
public void process(String msg) {
log.warn("接收到:" + msg);
}
}
10.运行consumer模块就可以接收消息

11.为了使消息变成动态的,写一个用户类
package com.xhy.consumer.provider;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
@SuppressWarnings("all")
@Data
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class User{
private String username;
private String userpwd;
}
12.Sender
package com.xhy.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all") public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendFirst() {
rabbitTemplate.convertAndSend("firstQueue", "Hello World");
}
public void sendFirst(User user) {
rabbitTemplate.convertAndSend("firstQueue", user);
}
public void sendFirst(String json) {
rabbitTemplate.convertAndSend("firstQueue", json);
}
}
13.
package com.xhy.provider;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
@SneakyThrows
void contextLoads() {
User user=new User("aa","bb");
ObjectMapper mapper=new ObjectMapper();
sender.sendFirst(mapper.writeValueAsString(user));
}
}
14.運行
