前面介绍消息中间件的概念,以及几种交换器的特性
也说了生产者生产消息,并发给交换器,由交换器将消息发给队列,再由绑定到队列上的消费者进行消费
本节学习生产者生产消息,发布时的几种方式
一、无保障
在之前的学习中,通过channel.basicPublish(EXCHANGE_NAME, routekey,null, message.getBytes());
发布并使用正确的交换器和路由信息,消息会被接收并发送到合适的队列中
但是如果出现: 1、网络问题,2、消息不可路由,3、RabbitMQ 自身有问题
这种方式就存在风险,无保证的消息发送一般情况下不推荐
二、失败确认
在发送消息时设置 mandatory 标志为true,channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());
告诉RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败
注意:它只会让RabbitMQ 向你通知失败,而不会通知成功
即如果消息正确路由到队列,则发布者不会收到任何通知
这叫导致无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失
采用失败确认的方式,需设置一个回调,当生产者接收到RabbitMQ 的通知后,进行处理
//失败通知 回调
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String message = new String(bytes);
System.out.println("返回的replycode:"+replycode);
System.out.println("返回的replyText:"+replyText);
}
});2.1 额外的监听器
当生产者监听到信道或者连接关闭时,可以进行相应处理
//TODO 回调
//连接关闭时执行
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException e) {
}
});
//TODO 回调
//信道关闭时执行
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException e) {
}
});2.2 失败确认生产者完整demo
public class ProducerMandatory {
public final static String EXCHANGE_NAME = "mandatoryTest";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定Direct交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//TODO 回调
//连接关闭时执行
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException e) {
System.out.println("连接已关闭");
}
});
//TODO 回调
//信道关闭时执行
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException e) {
System.out.println("信道已关闭");
}
});
//TODO
//失败通知 回调
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String message = new String(bytes);
System.out.println("收到MQ的反馈,有消息路由不可达");
System.out.println("返回的replycode:"+replycode);
System.out.println("返回的replyText:"+replyText);
}
});
String[] routekeys={"zc1","zc2","zc3"};
for(int i=0;i<3;i++){
String routekey = routekeys[i%3];
// 发送的消息
String message = "Hello World_"+(i+1) +("_"+System.currentTimeMillis());
//TODO
channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());
Thread.sleep(200);
}
// 关闭频道和连接
channel.close();
connection.close();
}
}三、发送方通知
MQ内部发生错误无法投递到队列时,则向生产者返回Nack,成功投递则返回ack
原理:生产者将信道设置成confirm模式,一旦信道进入confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),由这个id 在生产者和RabbitMQ 之间进行消息的确认
不可路由的消息,当交换器发现,消息不能投递到任何队列,会进行确认操作,表示收到了消息。如果发送方设置了mandatory 模式,则会先调用 addReturnListener 监听器
可路由的消息,要等到消息被投递到所有匹配的队列之后,broker 会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号
3.1 三种模式:
一般确认模式
批量确认模式
异步监听发送方确认模式
3.1.1 一般确认模式demo
public class ProducerConfirm {
public final static String EXCHANGE_NAME = "producerConfirm";
private final static String ROUTE_KEY = "zc";
public static void main(String[] args) throws Exception {
// 创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//1、启用发送者确认模式
channel.confirmSelect();
//所有日志严重性级别
for(int i=0;i<2;i++){
// 发送的消息
String message = "Hello World_"+(i+1);
channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY,null, message.getBytes());
System.out.println(" Sent Message: [" + ROUTE_KEY +"]:'"+ message + "'");
//2、确认是否成功(true成功)
if(channel.waitForConfirms()){
System.out.println("send success");
}else{
System.out.println("send failure");
}
}
// 关闭信道和连接
channel.close();
connection.close();
}
}3.1.2 批量确认模式demo
注意:失败确认和发送者通知可以同时使用
在批量确认模式中展示,如果发送方设置了mandatory 模式,则会先调用 addReturnListener 监听
public class ProducerBatchConfirm {
public final static String EXCHANGE_NAME = "producer_wait_confirm";
private final static String ROUTE_KEY = "zc";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定转发
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//4、添加失败通知的监听器
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body);
System.out.println("RabbitMq返回的replyCode: "+replyCode);
System.out.println("RabbitMq返回的replyText: "+replyText);
}
});
//1、启用发送者确认模式
channel.confirmSelect();
for(int i=0;i<10;i++){
// 发送的消息
String message = "Hello World_"+(i+1);
//3、开启失败确认
channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, true,null, message.getBytes());
}
//2、启用发送者确认模式(批量确认)
channel.waitForConfirmsOrDie();
// 关闭频道和连接
channel.close();
connection.close();
}
}3.1.3 异步监听发送方确认模式demo
public class ProducerConfirmAsync {
public final static String EXCHANGE_NAME = "producerAsyncConfirm";
public static void main(String[] args) throws Exception {
// 创建连接连接到MabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定转发
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//1、启用发送者确认模式
channel.confirmSelect();
//2、添加发送者确认监听器
channel.addConfirmListener(new ConfirmListener() {
//TODO 成功
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("send_ACK:"+deliveryTag+",multiple:"+multiple);
}
//TODO 失败
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Erro----send_NACK:"+deliveryTag+",multiple:"+multiple);
}
});
// 添加失败者通知的监听器
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("RabbitMq路由失败: "+routingKey+"."+message);
}
});
String[] routekeys={"zc1","zc2"};
for(int i=0;i<20;i++){
String routekey = routekeys[i%2];
// 发送的消息
String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
channel.basicPublish(EXCHANGE_NAME, routekey, true,
MessageProperties.PERSISTENT_BASIC, message.getBytes());
}
// 关闭频道和连接
//channel.close();
//connection.close();
}
}四、备用交换器
在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器
注意:第二节介绍了失败通知,作用就是当交换器无法路由时,向生产者反馈
而当配置了备用交换器,如果主交换器无法路由消息,RabbitMQ 并不会通知发布者,因为,向备用交换器发送消息,表示消息已经被路由了
4.1 生产者
public class BackupExProducer {
public final static String EXCHANGE_NAME = "main-exchange";
public final static String BAK_EXCHANGE_NAME = "bak-exchange";
public static void main(String[] args) throws Exception {
// 创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
//1、声明备用交换器
Map<String,Object> argsMap = new HashMap<String,Object>();
argsMap.put("alternate-exchange",BAK_EXCHANGE_NAME);
//主交换器
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,argsMap);
//备用交换器
channel.exchangeDeclare(BAK_EXCHANGE_NAME,BuiltinExchangeType.FANOUT,true,false,null);
//所有的消息
String[] routekeys={"zc1","zc2","zc3"};
for(int i=0;i<3;i++){
String routekey = routekeys[i%3];
// 发送的消息
String message = "Hello World_"+(i+1);
channel.basicPublish(EXCHANGE_NAME, routekey,null, message.getBytes());
}
// 关闭频道和连接
channel.close();
connection.close();
}
}4.2 主消费者
public class MainConsumer {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明一个队列
String queueName = "backupexchange";
channel.queueDeclare(queueName,false,false,false,null);
String routekey="zc1";
channel.queueBind(queueName,BackupExProducer.EXCHANGE_NAME, routekey);
System.out.println(" [*] Waiting for messages......");
// 创建队列消费者
final Consumer consumerB = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
//记录日志到文件:
System.out.println( "Received [" + envelope.getRoutingKey() + "] "+message);
}
};
channel.basicConsume(queueName, true, consumerB);
}
}4.3 备用交换器对应的消费者
public class BackupExConsumer {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(BackupExProducer.BAK_EXCHANGE_NAME,
BuiltinExchangeType.FANOUT,true, false, null);
// 声明一个队列
String queueName = "fetchother";
channel.queueDeclare(queueName,false,false, false,null);
channel.queueBind(queueName,BackupExProducer.BAK_EXCHANGE_NAME, "#");
System.out.println(" [*] Waiting for messages......");
// 创建队列消费者
final Consumer consumerB = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
//记录日志到文件:
System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
}
};
channel.basicConsume(queueName, true, consumerB);
}
}总结:
本节主要学习生产者在发布消息时可以采取的一些消息策略
1、无保障。最简单粗暴的消息策略,只管发送,不管MQ是否正常收到
2、失败确认
实现方式:生产者在发布消息时将mandatory设为true,并设置一个回调函数
优点:当MQ发现消息不可路由,则向生产者反馈
缺点:向生产者反馈的消息可能会丢失
3、发送方确认
MQ内部发生错误无法投入到队列时,则向生产者返回Nack,成功投递则返回ack
种类:1、一般确认模式 2、批量确认模式 3、异步监听发送方确认模式
实现方式:生产者将信道设置为confirm模式(channel.confirmSelect();)
4、备用交换器
当找不到路由时,可以将消息发送到备用交换器上
在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器