消息确认
在消息队列中,消费者消费了信息,但是却没能成功执行信息中的事务,如果此时消息队列中的消息已经被消费了,那么显然是不可取的。
那么如何保证消息被成功消费了呢?在RabbitMq的消息队列中存在确认机制,我们大致可以将消息队列中的消息分成3种状态。
- 已准备,刚被生产者放入队列消息
- 待确认,被消费者接收的消息,还未发送确认信息
- 已确认消费,消费者接收到消息后发送确认信息
在RabbitMq管理页面中也体现了这三种状态的消息,如果客户端与RabbitMq失去连接,而且消息还没有被确认,那么待确认的消息会重新变成已准备的状态。
注意Total表示的是待确认和已准备的消息数量,不包括已经消费完的数量。
java客户端发送确认信息
- 设置自动确认
在java客户端中接收消息时可以设置时候自动确认,默认是false,就是手动确认。
第二个参数就是设置自动确认属性
- 手动确认
调用basicAck方法
交换机的使用
交换机的类型如下:
- direct
这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生产者生产消息的时候指定一个routingKey,当绑定的队列的routingKey与生产者发送的一样,那么交换机会把这个消息发送给对应的队列。
- fanout
这种类型的交换机路由规则很简单,只要与他绑定了的队列, 他就会把消息发送给对应队列(与routingKey没关系)。
- topic
这种类型的交换机路由规则也是和routingKey有关,只不过topic可以根据*
和#
( *
代表过滤一单词,#
代表过滤后面所有单词, 用.
隔开)来识别routingKey。
假设我绑定的routingKey有队列A和B,A的routingKey是*.user
,B的routingKey是#.user
。
那么我生产一条消息routingKey为error.user
那么此时 2个队列都能接受到, 如果改为topic.error.user
那么这时候,只有B能接受到了。
- headers
这个类型的交换机很少用到,他的路由规则 与routingKey无关 而是通过判断header参数来识别的, 基本上没有应用场景,因为上面的三种类型已经能应付了。
这里只介绍前面三种交换机direct、fanout和topic,需要注意的是这里的匹配都是和交换机和队列相关,是与交换机和队列绑定的时候和发送消息的时候相关。
fanout
交换机直接绑定到队列上,这样只要绑定了交换机的队列都可以接收到数据,与路由键的匹配没有联系。
public static void sendByFanoutExchange(String message) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
// 声明exchange
channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
//交换机和队列绑定,最后一个参数是路由键
channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");
channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("发送的信息为:" + message);
channel.close();
connection.close();
}
direct
同样需要绑定交换机,同时会判断路由键是否相等,首先判断交换机是否绑定了队列,其次判断路由键是否相等。
public static void sendByDirectExchange(String message) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
// 声明exchange
channel.exchangeDeclare("direct"+ConnectionUtil.EXCHANGE_NAME, "direct");
// 交换机和队列绑定
// 第三个参数是这个队列与交换机绑定时的路由键,相当于是这个队列的路由键
channel.queueBind(ConnectionUtil.QUEUE_NAME, "direct"+ConnectionUtil.EXCHANGE_NAME, "test");
// 第二个参数是路由键,属于这个消息的路由键
// 在Direct类型的交换机下这个路由键和上面绑定的路由键相等的时候队列才能接收消息
channel.basicPublish("direct"+ConnectionUtil.EXCHANGE_NAME, "tes", null, message.getBytes());
System.out.println("发送的信息为:" + message);
channel.close();
connection.close();
}
topic
原理同direct,不过对于路由键的匹配可以进行模糊判断,支持*
和#
的匹配。
public static void sendByTopicExchange(String message) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(ConnectionUtil.QUEUE_NAME+"error",true,false,false,null);
channel.queueDeclare(ConnectionUtil.QUEUE_NAME+"debug",true,false,false,null);
channel.queueDeclare(ConnectionUtil.QUEUE_NAME+"user",true,false,false,null);
// 声明exchange
channel.exchangeDeclare("topic"+ConnectionUtil.EXCHANGE_NAME, "topic");
// 交换机和队列绑定
// 第三个参数是这个队列与交换机绑定时的路由键,相当于是这个队列的路由键
// 支持模糊匹配,支持*和#
// *代表一个单词,#代表任意个单词
channel.queueBind(ConnectionUtil.QUEUE_NAME+"error", "topic"+ConnectionUtil.EXCHANGE_NAME, "info.*");
channel.queueBind(ConnectionUtil.QUEUE_NAME+"debug", "topic"+ConnectionUtil.EXCHANGE_NAME, "user.#");
channel.queueBind(ConnectionUtil.QUEUE_NAME+"user", "topic"+ConnectionUtil.EXCHANGE_NAME, "*.info.*");
// 第二个参数是路由键,属于这个消息的路由键
// 在Topic类型的交换机下这个路由键和上面绑定的路由键相匹配的时候队列才能接收消息
// 下面的信息会发送到error队列
channel.basicPublish("topic"+ConnectionUtil.EXCHANGE_NAME, "info.error", null, ("info.error"+message).getBytes());
// 下面的信息会发送到debug和user队列
channel.basicPublish("topic"+ConnectionUtil.EXCHANGE_NAME, "user.info.error", null, ("user.info.error"+message).getBytes());
// 下面的信息会发送到error和debug队列
channel.basicPublish("topic"+ConnectionUtil.EXCHANGE_NAME, "info.debug", null, ("info.debug"+message).getBytes());
System.out.println("发送的信息为:" + message);
channel.close();
connection.close();
}
与springboot的整合
引入依赖
除了一般的springboot项目的依赖后需要引入下面的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置连接
有两种方式,yml配置和javaConfig
yml配置
需要配置如下参数,基本相当于前面原生代码中与RabbitMq的连接
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: zdd
password: 123456
virtual-host: testhost
javaConfig
在spring的配置类中配置如下的bean对象
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost",5672);
//我这里直接在构造方法传入了
// connectionFactory.setHost();
// connectionFactory.setPort();
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("testhost");
//是否开启消息确认机制
//connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
创建交换机
同样是在spring配置类中
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange("directExchange");
}
创建队列
@Bean
public Queue queue() {
//名字 是否持久化
return new Queue("testQueue", true);
}
绑定队列和交换机
@Bean
public Binding binding() {
//绑定一个队列 to: 绑定到哪个交换机上面 with:绑定的路由建(routingKey)
return BindingBuilder.bind(queue()).to(defaultExchange()).with("direct.key");
}
发送消息消息
发送消息比较简单,spring提供了一个RabbitTemplate来帮助我们完成发送消息的操作。
如果想对于RabbitTemplate这个类进行一些配置(至于有哪些配置我们后面会讲到) 我们可以在config类中吧他作为Bean new出来并配置。
- 配置RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
//注意 这个ConnectionFactory 是使用javaconfig方式配置连接的时候才需要传入的 如果是yml配置的连接的话是不需要的
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
- 发送消息
@Component
public class TestSend {
@Autowired
RabbitTemplate rabbitTemplate;
public void testSend() {
//参数介绍: 交换机名字,路由建, 消息内容
rabbitTemplate.convertAndSend("directExchange", "direct.key", "hello");
}
}
我们只需要写一个类,然后交给spring管理,在类里面注入RabbitTemplate 就可以直接调用api来发送消息了。
接收消息
同发送信息,只需要一个bean对象,在一个方法上加上@RabbitListener,设置需要接收消息的队列,就可以通过参数获得消息。
@Component
public class TestListener {
@RabbitListener(queues = "testQueue")
public void get(String message) throws Exception{
System.out.println(message);
}
}