springboot整合rabbitmq

springboot整合rabbitmq

依赖

<!-- mq -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

根据springboot自动配置原理,查看rabbitmq的自动配置类
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
可以看到rabbitmq的配置信息来自RabbitProperties
创建rabbitmq.properties配置文件以spring.rabbitmq为前缀
rabbitmq.properties

spring.rabbitmq.host=192.168.3.114
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
#连接到代理时用的虚拟主机
spring.rabbitmq.virtualHost=/dev
#连接到server的地址列表(以逗号分隔),先addresses后host
#spring.rabbitmq.addresses
#请求心跳超时时间,0为不指定,如果不指定时间单位默认为秒
#spring.rabbitmq.requested-heartbeat=0
#是否启用【发布确认】,默认false
#spring.rabbitmq.publisher-confirms=false
#是否启用【发布返回】,默认false
#spring.rabbitmq.publisher-returns=false
#连接超时时间,单位毫秒,0表示永不超时 
#spring.rabbitmq.connection-timeout=0

#=========================SSL=====================
#是否支持ssl,默认false
#spring.rabbitmq.ssl.enabled=false
#持有SSL certificate的key store的路径
#spring.rabbitmq.ssl.key-store=
#访问key store的密码
#spring.rabbitmq.ssl.key-store-password 
#持有SSL certificates的Trust store
#spring.rabbitmq.ssl.trust-store=
#访问trust store的密码
#spring.rabbitmq.ssl.trust-store-password= 
#JKS:Trust store 类型.
#spring.rabbitmq.ssl.trust-store-type=
#ssl使用的算法,默认由rabiitClient配置
#spring.rabbitmq.ssl.algorithm= 
#是否启用服务端证书验证
#spring.rabbitmq.ssl.validate-server-certificate=true
#是否启用主机验证
#spring.rabbitmq.ssl.verify-hostname=true 

#===========cache===============
#缓存中保持的channel数量
#spring.rabbitmq.cache.channel.size=
#当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
#spring.rabbitmq.cache.channel.checkout-timeout=
#缓存的channel数,只有是CONNECTION模式时生效
#spring.rabbitmq.cache.connection.size=
#连接工厂缓存模式:channel 和 connection
#spring.rabbitmq.cache.connection.mode=channel

#===================listener=========================
# 容器类型.simple或direct
#direct: 在这个容器中,监听器被直接在RabbitMQ消费者线程上调用。
#simple: RabbitMQ用户将消息发送给调用线程的容器  默认simple
#spring.rabbitmq.listener.type=simple  
#是否启动时自动启动容器
#spring.rabbitmq.listener.simple.auto-startup=true
	#auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack
	#manual: 手动ack,用户必须手动提交ack或者nack
	#none: 没有ack机制
#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
#spring.rabbitmq.listener.simple.acknowledge-mode=
#最小的消费者数量
#spring.rabbitmq.listener.simple.concurrency=5
#最大的消费者数量
#spring.rabbitmq.listener.simple.max-concurrency=10
#一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量.
#spring.rabbitmq.listener.simple.prefetch=
#当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch, 则prefetch将增加到这个值
#spring.rabbitmq.listener.simple.transaction-size=
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
#spring.rabbitmq.listener.simple.default-requeue-rejected=
#若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器
#spring.rabbitmq.listener.simple.missing-queues-fatal=true
#发布空闲容器的时间间隔,单位毫秒
#spring.rabbitmq.listener.simple.idle-event-interval=
#监听重试是否可用
#spring.rabbitmq.listener.simple.retry.enabled=false
#最大重试次数
#spring.rabbitmq.listener.simple.retry.max-attempts=3
#最大重试时间间隔
#spring.rabbitmq.listener.simple.retry.max-interval=10000ms
#第一次和第二次尝试传递消息的时间间隔
#spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
#spring.rabbitmq.listener.simple.retry.multiplier=1
# 重试时有状态or无状态
#spring.rabbitmq.listener.simple.retry.stateless=true
 
#==============template=========================
#为true时,如果exchange根据routingKey将消息路由到queue时找不到匹配的queue,触发return回调,为false时,exchange直接丢弃消息。
#spring.rabbitmq.template.mandatory=false
# receive() 操作的超时时间
#spring.rabbitmq.template.receive-timeout=
#sendAndReceive() 操作的超时时间
#spring.rabbitmq.template.reply-timeout=
#发送重试是否可用  
#spring.rabbitmq.template.retry.enabled=true
#最大重试次数
#spring.rabbitmq.template.retry.max-attempts=3
#第一次和第二次尝试发布或传递消息之间的间隔
#spring.rabbitmq.template.retry.initial-interva=1000ms 
#应用于上一重试间隔的乘数
#spring.rabbitmq.template.retry.multiplier=1
#最大重试时间间隔
#spring.rabbitmq.template.retry.max-interval=10000

创建RabbitMqConfig配置类加载rabbitmq.properties,并创建交换机、队列、绑定等

package com.conformity.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;


@Configuration
@PropertySource(value = {"classpath:rabbitmq.properties"},encoding = "utf-8")
public class RabbitMqConfig {
	
	/** 将rabbitmq服务器队列存的消息转成json格式
	 * @return
	 */
	@Bean
	public MessageConverter setMessageConverter() {
		return new Jackson2JsonMessageConverter();
	}
	
	/**
	 * 创建队列交换机
	 */
	//点对点模式
	@Bean
	public Exchange directExchange() {
		//第一个参数:交换机名称  第二个参数  是否持久化  第三个参数是否自动删除
		return new DirectExchange("direct.exchange", true, false);
	}
	
	//发布订阅
	@Bean
	public Exchange fanoutExchange() {
		return new FanoutExchange("fanout.exchange", true, false);
	}
	
	//主题
	@Bean
	public Exchange topExchange() {
		return new TopicExchange("topic.exchange", true, false);
	}
	
	/**
	 * 创建队列
	 */
	@Bean
	public Queue directQueue() {
		return new Queue("direct.queue", true);
	}
	
	@Bean
	public Queue fanoutQueue() {
		return new Queue("fanout.queue", true);
	}
	@Bean
	public Queue topicQueue() {
		return new Queue("topic.queue", true);
	}
	
	/**
	 * 绑定
	 */
	 //点对点模式  
	@Bean
	public Binding bindingDir() {
		 return new Binding("direct.queue", DestinationType.QUEUE, "direct.exchange","direct.test",null);
	}
	//广播模式 广播类型的交换机绑定三个队列
	@Bean
	public Binding bindingFan1() {
		return new Binding("fanout.queue", DestinationType.QUEUE, "fanout.exchange","",null);
	}
	@Bean
	public Binding bindingFan2() {
		return new Binding("direct.queue", DestinationType.QUEUE, "fanout.exchange","",null);
	}
	@Bean
	public Binding bindingFan3() {
		return new Binding("topic.queue", DestinationType.QUEUE, "fanout.exchange","",null);
	}
	//主题模式的交换机绑定两个队列
	@Bean
	public Binding bindingTOP1() {
		return new Binding("topic.queue", DestinationType.QUEUE, "topic.exchange","top.#",null);
	}
	@Bean
	public Binding bindingTOP2() {
		return new Binding("direct.queue", DestinationType.QUEUE, "topic.exchange","dir.#",null);
	}
}

补充:关于Queue构造参数
在这里插入图片描述

测试
controller

package com.conformity.general.controller;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.conformity.general.entity.test.MybatisTestUser;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import springfox.documentation.annotations.ApiIgnore;

@RestController
@RequestMapping(value = "/rabbit",method = RequestMethod.POST)
@Api(tags = {"rabbitmq的测试类"})
public class RabbitmqController {

	@Resource
	private RabbitTemplate rabbitTemplate;
	
	@RequestMapping("/sendMessage1")
	@ApiOperation("rabbitMq测试---点对点发送消息")
	@ApiOperationSupport(author = "lsx",order = 1)
	@ApiImplicitParams({   
        @ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
        @ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
        @ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
	})
	public String sendMessage1(@ApiIgnore MybatisTestUser user) {
		try {
			rabbitTemplate.convertAndSend("direct.exchange", "direct.test", user);
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";
		
	}
	@RequestMapping("/sendMessage2")
	@ApiOperation("rabbitMq测试---广播发送消息")
	@ApiOperationSupport(author = "lsx",order = 2)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
	})
	public String sendMessage2(@ApiIgnore MybatisTestUser user) {
		try {
			rabbitTemplate.convertAndSend("fanout.exchange", "aaa", user);
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";
		
	}
	@RequestMapping("/sendMessage3")
	@ApiOperation("rabbitMq测试---主题发送消息")
	@ApiOperationSupport(author = "lsx",order = 3)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
		@ApiImplicitParam(name = "type", value = "主题top/dir", required = true, paramType = "query"),          
	})
	public String sendMessage3(@ApiIgnore MybatisTestUser user,@ApiIgnore String type) {
		try {
			rabbitTemplate.convertAndSend("topic.exchange", type+".aaa", user);
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";
		
	}
}

service

package com.conformity.general.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMqService {
	
	@RabbitListener(queues = {"direct.queue"})
	public void getMessage1(Object object) {
		System.out.println("direct.queue========"+object);
	}
	@RabbitListener(queues = {"fanout.queue"})
	public void getMessage2(Object object) {
		System.out.println("fanout.queue========"+object);
	}
	@RabbitListener(queues = {"topic.queue"})
	public void getMessage3(Object object) {
		System.out.println("topic.queue========"+object);
	}
}

rabbitmq管理界面
在这里插入图片描述
在这里插入图片描述
看到交换机和队列都没有RabbitMqConfig配置的信息
启动springboot,成功启动后,再次观察rabbitmq管理界面
在这里插入图片描述
在这里插入图片描述
交换机和队列中都出现了我们配置的信息
通过访问controller进行测试
在这里插入图片描述
后台控制台,输出对应信息 到此整合完成
在这里插入图片描述

rabbitmq消息可靠性代码测试

代码变动
service

@RabbitListener(queues = {"direct.queue"})
	public void getMessage1(Message message) {
		Object parse = JSONObject.parse(new String(message.getBody())); 
		JSONObject user = (JSONObject)parse;
		System.out.println("direct.queue========"+user);
	}
	@RabbitListener(queues = {"fanout.queue"})
	public void getMessage2(Message message) {
		Object parse = JSONObject.parse(new String(message.getBody())); 
		JSONObject user = (JSONObject)parse;
		System.out.println("fanout.queue========"+user);
	}
	@RabbitListener(queues = {"topic.queue"})
	public void getMessage3(Message message) {
		Object parse = JSONObject.parse(new String(message.getBody())); 
		JSONObject user = (JSONObject)parse;
		System.out.println("topic.queue========"+user);
	}

controller

@RestController
@RequestMapping(value = "/rabbit",method = RequestMethod.POST)
@Api(tags = {"rabbitmq的测试类"})
public class RabbitmqController {

	@Resource
	private RabbitTemplate rabbitTemplate;

	@RequestMapping("/sendMessage1")
	@ApiOperation("rabbitMq测试---点对点发送消息")
	@ApiOperationSupport(author = "lsx",order = 1)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
	})
	public String sendMessage1(@ApiIgnore MybatisTestUser user) {
		try {
			rabbitTemplate.convertAndSend("direct.exchange", "direct.test",JSONObject.toJSON(user));
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";

	}
	@RequestMapping("/sendMessage2")
	@ApiOperation("rabbitMq测试---广播发送消息")
	@ApiOperationSupport(author = "lsx",order = 2)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
	})
	public String sendMessage2(@ApiIgnore MybatisTestUser user) {
		try {
			rabbitTemplate.convertAndSend("fanout.exchange", "aaa", JSONObject.toJSON(user));
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";

	}

	@RequestMapping("/sendMessage3")
	@ApiOperation("rabbitMq测试---主题发送消息")
	@ApiOperationSupport(author = "lsx",order = 3)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
		@ApiImplicitParam(name = "type", value = "主题top/dir", required = true, paramType = "query"),          
	})
	public String sendMessage3(@ApiIgnore MybatisTestUser user,@ApiIgnore String type) {
		try {
			rabbitTemplate.convertAndSend("topic.exchange", type+".aaa", JSONObject.toJSON(user));
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";

	}

	@RequestMapping("/sendMessage4")
	@ApiOperation("rabbitMq测试---消息可靠性投递")
	@ApiOperationSupport(author = "lsx",order = 4)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
	})
	public void sendMessage4(@ApiIgnore MybatisTestUser user) {
		CorrelationData correlationData = new CorrelationData();
		byte[] body = JSONObject.toJSONBytes(user, SerializerFeature.QuoteFieldNames);
		Message message = new Message(body , new MessageProperties());
		correlationData.setReturned(new ReturnedMessage(message, 0, "", "test-confirm", "direct.test"));
		//写错交换机的名称
		rabbitTemplate.convertAndSend("test-confirm","direct.test",JSONObject.toJSON(user),correlationData);
		//写错路由键的名称
//		rabbitTemplate.convertAndSend("direct.exchange", "5555",JSONObject.toJSON(user));
	}

RabbitMqConfig

@Bean
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
       RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
       rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
		if (ack) {
			System.out.println("消息发送成功:"+correlationData.getId());
		}else {
			Message message = correlationData.getReturned().getMessage();
			Object parse = JSONObject.parse(new String(message.getBody()));
			System.out.println("失败消息:"+parse);
			System.err.println("发送失败原因:"+cause);
			System.out.println("插入数据库或者重发");
		}
	});

	rabbitTemplate.setReturnsCallback((returnedMessage)->{
		byte[] body = returnedMessage.getMessage().getBody();
		System.err.println("未进入队列消息:"+new String(body));
		System.err.println("应答码:"+returnedMessage.getReplyCode());
		System.err.println("原因:"+returnedMessage.getReplyText());
		System.err.println("路由建:"+returnedMessage.getRoutingKey());
		System.err.println("交换机:"+returnedMessage.getExchange());
		//重新发送该消息
		rabbitTemplate.convertAndSend("direct.exchange", "direct.test",returnedMessage.getMessage());
	});
	rabbitTemplate.setMandatory(true);
	rabbitTemplate.setMessageConverter(setMessageConverter());
       return rabbitTemplate;
   }

rabbitmq.properties

#是否启用【发布确认】,默认false
spring.rabbitmq.publisher-confirms=true
#是否启用【发布返回】,默认false
spring.rabbitmq.publisher-returns=true

其他不变

可靠性消息投递

基于confirm模式和returns模式
当消息未到达交换机时发生消息丢失
在这里插入图片描述
在该方法中故意写错交换机的名称,因此消息无法到达交换机,在消息发送后会进入RabbitMqConfig配置的ConfirmCallback中
在这里插入图片描述
当消息进入交换机但未能进入队列中,发生消息丢失
在
这次故意写错路由键,因此消息无法进入队列,在消息发送后会进入RabbitMqConfig配置的ReturnsCallback中
在这里插入图片描述
但是当服务器宕机不可用的情况下,还是有可能发生数据丢失。可以搭建rabbitmq集群,或者借助其他缓存工具先缓存,消息发送成功后再删除等等。

消息消费确认

代码改动
rabbitmq.properties

#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
	#auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack
	#manual: 手动ack,用户必须手动提交ack或者nack
	#none: 没有ack机制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.prefetch=10

其他配置不变
service

@RabbitListener(queues = {"direct.queue"})
	public void getMessage1(Message message,Channel channel) {
		try {
			//业务处理正常
			Object parse = JSONObject.parse(new String(message.getBody())); 
			JSONObject user = (JSONObject)parse;
			int a = 5/0;
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			System.out.println("业务处理完成");
			System.out.println("direct.queue========"+user);
		} catch (Exception e) {
			//业务处理不正常
			try {
				channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
				System.out.println("业务处理失败");
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
		
	}

controller

@RequestMapping("/sendMessage5")
	@ApiOperation("rabbitMq测试---消费确认")
	@ApiOperationSupport(author = "lsx",order = 5)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "age", value = "年龄", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "name", value = "姓名", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "email", value = "邮箱", required = true, paramType = "query"),          
	})
	public String sendMessage5(@ApiIgnore MybatisTestUser user,@ApiIgnore String type) {
		try {
			rabbitTemplate.convertAndSend("direct.exchange","direct.test", JSONObject.toJSON(user));
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";

	}

将确认代码注释掉之后,后台虽然接收了消息,但是在rabbitmq管理界面中消息依然存在
在这里插入图片描述
在这里插入图片描述
将消费确认代码开启后,重启服务,还是能接收到消息,此时返回给rabbitmq返回ack
在这里插入图片描述
rabbitmq管理界面的消息没有了

在这里插入图片描述
如果返回的是Nack的话在这里插入图片描述

在这里插入图片描述
后台一直接收到该消息,并且管理界面是ready状态,因此会一直监听到该消息这和channel.basicNack方法的第三个参数有关
对于channel.basicNack方法的第三个参数,表示消息nack后是否返回队列,如果设置为true,表示返回队列,此时消息处于队列头部,消费者会一直处理该消息,影响后续消息的消费,设置为false时表示不返回队列,此时如果设置有DLX(死信队列),那么消息会进入DLX中,后续再对该消息进行相应的处理,如果没有设置DLX,此时消息就会被丢弃。
到此消息确认测试完毕。

优先级队列测试

创建优先级队列

//优先级队列
	@Bean
	public Queue proQueue() {
		/**
		 * 第一个参数队列名称
		 * 第二个参数 是否持久化
		 * 第三个参数 是否创建排他队列
		 * 第四个参数 是否自动删除
		 * 第五个参数 队列参数配置
		 */
		Map<String, Object> arguments = new HashMap<>();
		arguments.put("x-max-priority", 10);
		return new Queue("priorityQueue", true, false, false, arguments);
	}
@Bean
	public Binding proBinding() {
		return new Binding("priorityQueue", DestinationType.QUEUE, "direct.exchange", "direct.priority", null);
	}

controller:

@RequestMapping("/sendMessage6")
	@ApiOperation("rabbitMq测试---测试优先级队列")
	@ApiOperationSupport(author = "lsx",order = 6)
	@ApiImplicitParams({   
	})
	public String sendMessage6() {
		try {
			for(int i= 1;i<=10;i++) {
				int a = new Random().nextInt(9);
				JSONObject jsonObject = new JSONObject();
				jsonObject.put("priority", a);
				//通过MessagePostProcessor 配置优先级
				rabbitTemplate.convertAndSend("direct.exchange","direct.priority", jsonObject, (message)->{
					message.getMessageProperties().setPriority(a);
					return message;
				});
			}
		} catch (Exception e) {
			e.printStackTrace();
			return "发送失败";
		}
		return "发送成功";

	}

service:

//@RabbitListener(queues = {"priorityQueue"})
	public void getMessage4(Message message,Channel channel) {
		try {
			JSONObject parse = JSONObject.parseObject(new String(message.getBody(),"utf-8"));
			System.out.println("priorityQueue========开始消费");
			Set<String> keySet = parse.keySet();
			for (String key : keySet) {
				System.out.println(key+"="+parse.get(key));
			}
			System.out.println("priorityQueue========消费结束");
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		} catch (Exception e) {
			e.printStackTrace();
			try {
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
		} 
		
	}

先后发送优先级从1到10 总共10条消息,同时开启监听,看日志消息的消费顺序
在这里插入图片描述
开启消费查看消费记录
在这里插入图片描述
消费顺序按照优先级从大到小


版权声明:本文为admin522043032原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。