导入依赖
<dependencies>
<!-- junit-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--web开发的起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
消息消费者
配置文件
spring:
rabbitmq:
host: 192.168.200.130 # ip
port: 5672
username: admin
password: admin
virtual-host: /itcast
listener:
type: simple
simple:
prefetch: 1 #消费者每次从队列获取的消息数量
concurrency: 2 #消费者数量
max-concurrency: # 启动消费者最大数量
server:
port: 8001
生产消息
@Test
public void testSend(){
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(RabbitMQConfig1.QUEUE_NAME,"这是普通模式");
}
}
接收消息
@Component
public class RabbimtMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println("消费者1"+new String(message.getBody())+System.currentTimeMillis());
模拟处理需要1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果打印
可靠性传递
配置文件
spring:
rabbitmq:
host: 192.168.200.130 # ip
port: 5672
username: admin
password: admin
virtual-host: /itcast
listener:
type: simple
simple:
prefetch: 1 #消费者每次从队列获取的消息数量
concurrency: 2 #消费者数量
max-concurrency: # 启动消费者最大数量
publisher-confirms: true #确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为 publisher-confirm-type: correlated
publisher-returns: true #确认消息已发送到队列(Queue)
server:
port: 8001
配置文件
@Configuration
public class RabbitMQConfig1 {
public static final String QUEUE_NAME = "boot_queue";
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
//我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)
rabbitTemplate.setMandatory(true);
//消息发送到交换机,是否成功收到消息,true成功,false失败
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println();
System.out.println("相关数据:" + correlationData);
if (ack) {
System.out.println("投递成功,确认情况:" + ack);
} else {
System.out.println("投递失败,确认情况:" + ack);
System.out.println("原因:" + cause);
}
}
});
//消息发送到交换机,在有交换机发送到队列失败,才会执行
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println();
System.out.println("ReturnCallback: " + "消息:" + message);
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
System.out.println();
}
});
return rabbitTemplate;
}
}
消费端手动确认
@Component
public class RabbimtMQListener3 {
@RabbitListener(queues = "boot_queue")
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(message.toString());
// int i=1/0;
channel.basicAck(deliveryTag,true); //手动确认
System.out.println("手动确认");
} catch (IOException e) {
//拒绝签收
//第三个参数,重回队列重新发送
channel.basicNack(deliveryTag,true,true);
System.out.println("拒绝签收");
}
}
}
版权声明:本文为qq_46859110原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。