秒杀项目(5)集成RabbitMQ

一、MQ队列的优缺点

1.1 MQ队列的缺点

1)系统更复杂,多了一个MQ组件

2)消息传递路径更长,延时会增加

3)消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证

4)上游无法知道下游的执行结果,这一点是很致命的

1.2 MQ队列的优点

1)不需要预留buffer,上游任务执行完,下游任务总会在第一时间被执行

2)依赖多个任务,被多个任务依赖都很好处理,只需要订阅相关消息即可

3)有任务执行时间变化,下游任务都不需要调整执行时间

二、简单使用

2.1 AMQP简介

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

2.2 添加依赖

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>

2.3 添加配置

#RabbitMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消费者数量
spring.rabbitmq.listener.simple.concurrency=10
#消费者最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#消费,每次从队列中取多少个,取多了,可能处理不过来
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.auto-startup=true
#消费失败的数据重新压入队列
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#发送,队列满的时候,发送不进去,启动重置
spring.rabbitmq.template.retry.enabled=true
#一秒钟之后重试
spring.rabbitmq.template.retry.initial-interval=1000
#
spring.rabbitmq.template.retry.max-attempts=3
#最大间隔 10s
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

2.4 发送者

  • 发送者需要指定所使用的队列的名称
@Service
public class MQSend {
    public static final Logger log = LoggerFactory.getLogger(MQSend.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    public void send(String str){
        amqpTemplate.convertAndSend(MQConfig.MQ_Queue,str);
        log.info("发送的消息是" + str);
    }
}

2.5 接收者

@Service
public class MQReceive {
    public static Logger log  = LoggerFactory.getLogger(MQReceive.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    @RabbitListener(queues = MQConfig.MQ_Queue)
    public void receive(String str){
        log.info("接收到的消息" + str);
    }
}

测试:

@Controller
public class test {
    @Autowired
    MQSend mqSend;
    @RequestMapping("/hello")
    public String hello(){
        mqSend.send("你好呀");
        return "hello";
    }
}

结果:

2019-08-27 00:02:32.326  INFO 15288 --- [nio-8080-exec-1] com.mq.demo.MQSend                       : 发送的消息是你好呀
2019-08-27 00:02:32.337  INFO 15288 --- [ntContainer#0-2] com.mq.demo.MQReceive                    : 接收到的消息你好呀

2.6 MQ配置类

@Configuration
public class MQConfig {
	public static final String QUEUE="queue";
	public static final String MIAOSHA_QUEUE="miaosha.queue";
	
	public static final String TOPIC_QUEUE1="topic.queue1";
	public static final String TOPIC_QUEUE2="topic.queue2";
	public static final String HEADER_QUEUE="header.queue";
	public static final String TOPIC_EXCHANGE="topic.exchange";
	public static final String FANOUT_EXCHANGE="fanout.exchange";
	public static final String HEADER_EXCHANGE="header.exchange";
	public static final String ROUTINIG_KEY1="topic.key1";
	public static final String ROUTINIG_KEY2="topic.#";
	/**
	 * Direct模式,交换机Exchange:
	 * 发送者,将消息往外面发送的时候,并不是直接投递到队列里面去,而是先发送到交换机上面,然后由交换机发送数据到queue上面去,
	 * 做了依次路由。
	 */
	@Bean
	public Queue queue() {
		//名称,是否持久化
		return new Queue(QUEUE,true);5
	}
	
	@Bean
	public Queue miaoshaqueue() {
		//名称,是否持久化
		return new Queue(MIAOSHA_QUEUE,true);
	}

三、RabbitMQ的四种模式

3.1 direct交换机模式

在这里插入图片描述
发送者把消息发送到交换机上,再把消息发送到队列中

3.2 Topic模式

在这里插入图片描述

3.2.1 config类

@Configuration
public class MQConfig{
    public static final String MQ_Queue = "queue";
    public static final String Topic_Queue1 = "topic_queue1";
    public static final String Topic_Queue2 = "topic_queue2";
    public static final String Topic_Exchange = "topicExchange";
    public static final String Rout_Key1 = "topic.key1";
    public static final String Rout_Key2 = "topic.#";
    //#代表0个或者多个单词
    @Bean
    public Queue getQueue(){

         return new Queue(MQ_Queue, true);
    }
    @Bean
    public Queue getQueue1(){

        return new Queue(Topic_Queue1, true);
    }
    @Bean
    public Queue getQueue2(){

        return new Queue(Topic_Queue2, true);
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(Topic_Exchange);
    }
    @Bean
    public Binding topicBinding1(){
        return BindingBuilder.bind(getQueue1()).to(topicExchange()).with(Rout_Key1);
    }
    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(getQueue2()).to(topicExchange()).with(Rout_Key2);
    }
}

需要自定义n个队列和n个key,和一个交换机,并把不同的队列与对应的key通过交换机进行绑定

3.2.2 发送者

@Service
public class MQSend {
    public static final Logger log = LoggerFactory.getLogger(MQSend.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    public void sendTopic(String msg){
        log.info("发送的消息是" + msg);
        amqpTemplate.convertAndSend(MQConfig.Topic_Exchange, MQConfig.Rout_Key1,msg+"1");
        amqpTemplate.convertAndSend(MQConfig.Topic_Exchange, MQConfig.Rout_Key2,msg+"2");
    }
}

不同的队列发送的每个消息需要指定一个key值。

3.2.3 接受者

@Service
public class MQReceive {
    public static Logger log  = LoggerFactory.getLogger(MQReceive.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    @RabbitListener(queues = MQConfig.Topic_Queue1)
    public void receive1(String str){
        log.info("接收到Queue1的消息" + str);
    }
    @RabbitListener(queues = MQConfig.Topic_Queue2)
    public void receive2(String msg){
        log.info("接收到Queue2的消息" + msg);
    }
}

每个接收的方法都指定监听一个队列的消息。他是系统自动调用的。每个队列绑定一个key,他通过识别消息的key值与自己绑定的key值进行对比来确定是否接受该消息。

3.3 Fanout模式(广播模式)

发送消息到FanoutExchange交换机上,然后就交换机将消息发给所有与之绑定的队列。

在这里插入图片描述

3.3.1 config:

@Configuration
public class MQConfig{
    public static final String MQ_Queue = "queue";
    public static final String Fanout_Queue1 = "Fanout_queue1";
    public static final String Fanout_Queue2 = "Fanout_queue2";
    public static final String Fanout_Exchange = "FanoutExchange";
    //#代表0个或者多个单词
    @Bean
    public Queue getQueue(){

        return new Queue(MQ_Queue, true);
    }
    @Bean
    public Queue getQueue1(){

        return new Queue(Fanout_Queue1, true);
    }
    @Bean
    public Queue getQueue2(){

        return new Queue(Fanout_Queue2, true);
    }
    @Bean
    public FanoutExchange FanoutExchange(){
        return new FanoutExchange(Fanout_Exchange);
    }
    @Bean
    public Binding topicBinding1(){
        return BindingBuilder.bind(getQueue1()).to(FanoutExchange());
    }
    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(getQueue2()).to(FanoutExchange());
    }
}

3.3.2 发送者

@Service
public class MQSend {
    public static final Logger log = LoggerFactory.getLogger(MQSend.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    public void sendTopic(String msg){
        log.info("发送的消息是" + msg);
        amqpTemplate.convertAndSend(MQConfig.Fanout_Exchange,"", msg);
    }
}

3.3.3 接收者

@Service
public class MQReceive {
    public static Logger log  = LoggerFactory.getLogger(MQReceive.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    @RabbitListener(queues = MQConfig.Fanout_Queue1)
    public void receive1(String str){
        log.info("接收到Queue1的消息" + str);
    }
    @RabbitListener(queues = MQConfig.Fanout_Queue2)
    public void receive2(String msg){
        log.info("接收到Queue2的消息" + msg);
    }
}

3.3.4 结果

结果:

2019-08-27 12:17:50.381  INFO 10804 --- [nio-8080-exec-1] com.mq.demo.MQSend                       : 发送的消息是你好呀
2019-08-27 12:17:50.480  INFO 10804 --- [tContainer#0-10] com.mq.demo.MQReceive                    : 接收到Queue2的消息你好呀
2019-08-27 12:17:50.480  INFO 10804 --- [ntContainer#1-1] com.mq.demo.MQReceive                    : 接收到Queue1的消息你好呀

3.4 Header模式

需要满足所有的key才能将消息发送到消息队列中,这些key是以键值对的形式存在map中。

3.4.1 config

@Configuration
public class MQConfig{
    public static final String Header_Queue = "Header_queue";
    public static final String Headers_Exchange = "HeadersExchange";
    //#代表0个或者多个单词
    @Bean
    public Queue getQueue(){

        return new Queue(Header_Queue, true);
    }
    @Bean
    public HeadersExchange HeadersExchange(){
        return new HeadersExchange(Headers_Exchange);
    }
    @Bean
    public Binding HeaderBinding1(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("header1","hello");
        map.put("header2","world");
        return BindingBuilder.bind(getQueue()).to(HeadersExchange()).whereAll(map).match();
    }
}

3.4.2 发送者

@Service
public class MQSend {
    public static final Logger log = LoggerFactory.getLogger(MQSend.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    public void sendTopic(String msg){
        log.info("发送的消息是" + msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("header1","hello");
        messageProperties.setHeader("header2","world");
        Message obj = new Message(msg.getBytes(),messageProperties);
        amqpTemplate.convertAndSend(MQConfig.Headers_Exchange,"", obj);
    }
}

3.4.3 接收者

@Service
public class MQReceive {
    public static Logger log  = LoggerFactory.getLogger(MQReceive.class);
    @Autowired
    AmqpTemplate amqpTemplate;
    @RabbitListener(queues = MQConfig.Header_Queue)
    public void receive1(byte[] str){
        log.info("接收到Queue1的消息" + new String(str));
    }
}

3.4.4 测试

@Controller
public class test {
    @Autowired
    MQSend mqSend;
    @RequestMapping("/hello")
    @ResponseBody
    public String hello(){
        mqSend.sendTopic("你好呀");
        return "hello";
    }
}

结果:

-08-27 12:51:25.372  INFO 1452 --- [nio-8080-exec-1] com.mq.demo.MQSend                       : 发送的消息是你好呀
2019-08-27 12:51:25.467  INFO 1452 --- [tContainer#0-10] com.mq.demo.MQReceive                    : 接收到Queue1的消息你好呀

四、RabbitMQ的简单命令

4.1 打开RabbitMq的cmd

在这里插入图片描述

4.2 启动管理工具

rabbitmq-plugins enable rabbitmq_management

4.3 停止启动服务器命令

停止:net stop RabbitMQ
启动:net start RabbitMQ

4.4 浏览器中查看

http://127.0.0.1:15672/

在这里插入图片描述


版权声明:本文为qq_42306803原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。