消息中间件(4)—— 消息发布

前面介绍消息中间件的概念,以及几种交换器的特性

也说了生产者生产消息,并发给交换器,由交换器将消息发给队列,再由绑定到队列上的消费者进行消费

本节学习生产者生产消息,发布时的几种方式

一、无保障

在之前的学习中,通过channel.basicPublish(EXCHANGE_NAME, routekey,null, message.getBytes());

发布并使用正确的交换器和路由信息,消息会被接收并发送到合适的队列中

但是如果出现: 1、网络问题,2、消息不可路由,3、RabbitMQ 自身有问题

这种方式就存在风险,无保证的消息发送一般情况下不推荐

二、失败确认

在发送消息时设置 mandatory 标志为true,channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());

告诉RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败

注意:它只会让RabbitMQ 向你通知失败,而不会通知成功

即如果消息正确路由到队列,则发布者不会收到任何通知

这叫导致无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失

采用失败确认的方式,需设置一个回调,当生产者接收到RabbitMQ 的通知后,进行处理

        //失败通知 回调
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                String message = new String(bytes);
                System.out.println("返回的replycode:"+replycode);
                System.out.println("返回的replyText:"+replyText);
            }
        });

2.1 额外的监听器

当生产者监听到信道或者连接关闭时,可以进行相应处理

        //TODO 回调
        //连接关闭时执行
        connection.addShutdownListener(new ShutdownListener() {
            public void shutdownCompleted(ShutdownSignalException e) {
            }
        });

        //TODO 回调
        //信道关闭时执行
        channel.addShutdownListener(new ShutdownListener() {
            public void shutdownCompleted(ShutdownSignalException e) {
            }
        });

2.2 失败确认生产者完整demo

public class ProducerMandatory {

    public final static String EXCHANGE_NAME = "mandatoryTest";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接连接到RabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        // 设置MabbitMQ所在主机ip
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();
        // 指定Direct交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //TODO 回调
        //连接关闭时执行
        connection.addShutdownListener(new ShutdownListener() {
            public void shutdownCompleted(ShutdownSignalException e) {
               System.out.println("连接已关闭");
            }
        });

        //TODO 回调
        //信道关闭时执行
        channel.addShutdownListener(new ShutdownListener() {
            public void shutdownCompleted(ShutdownSignalException e) {
               System.out.println("信道已关闭");
            }
        });

        //TODO
        //失败通知 回调
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                String message = new String(bytes);
                System.out.println("收到MQ的反馈,有消息路由不可达");
                System.out.println("返回的replycode:"+replycode);
                System.out.println("返回的replyText:"+replyText);
            }
        });

        String[] routekeys={"zc1","zc2","zc3"};
        for(int i=0;i<3;i++){
            String routekey = routekeys[i%3];
            // 发送的消息
            String message = "Hello World_"+(i+1) +("_"+System.currentTimeMillis());
            //TODO
            channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());
            Thread.sleep(200);
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

三、发送方通知

MQ内部发生错误无法投递到队列时,则向生产者返回Nack,成功投递则返回ack

原理:生产者将信道设置成confirm模式,一旦信道进入confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),由这个id 在生产者和RabbitMQ 之间进行消息的确认

不可路由的消息,当交换器发现,消息不能投递到任何队列,会进行确认操作,表示收到了消息。如果发送方设置了mandatory 模式,则会先调用 addReturnListener 监听器

可路由的消息,要等到消息被投递到所有匹配的队列之后,broker 会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号

3.1 三种模式:

一般确认模式

批量确认模式

异步监听发送方确认模式

3.1.1 一般确认模式demo

public class ProducerConfirm {
    public final static String EXCHANGE_NAME = "producerConfirm";
    private final static String ROUTE_KEY = "zc";

    public static void main(String[] args) throws Exception {
        // 创建连接连接到RabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        // 设置MabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();
        // 指定交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        //1、启用发送者确认模式
        channel.confirmSelect();

        //所有日志严重性级别
        for(int i=0;i<2;i++){
            // 发送的消息
            String message = "Hello World_"+(i+1);
            channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY,null, message.getBytes());
            System.out.println(" Sent Message: [" + ROUTE_KEY +"]:'"+ message + "'");
            
            //2、确认是否成功(true成功)
            if(channel.waitForConfirms()){
                System.out.println("send success");
            }else{
                System.out.println("send failure");
            }
        }
        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}

3.1.2 批量确认模式demo

注意:失败确认和发送者通知可以同时使用

在批量确认模式中展示,如果发送方设置了mandatory 模式,则会先调用 addReturnListener 监听

public class ProducerBatchConfirm {

    public final static String EXCHANGE_NAME = "producer_wait_confirm";
    private final static String ROUTE_KEY = "zc";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 创建连接连接到RabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();
        // 指定转发
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //4、添加失败通知的监听器
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties,
                                     byte[] body)
                    throws IOException {
                String message = new String(body);
                System.out.println("RabbitMq返回的replyCode:  "+replyCode);
                System.out.println("RabbitMq返回的replyText:  "+replyText);
            }
        });

        //1、启用发送者确认模式
        channel.confirmSelect();

        for(int i=0;i<10;i++){
            // 发送的消息
            String message = "Hello World_"+(i+1);

            //3、开启失败确认
            channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, true,null, message.getBytes());
        }
        //2、启用发送者确认模式(批量确认)
        channel.waitForConfirmsOrDie();
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

3.1.3 异步监听发送方确认模式demo

public class ProducerConfirmAsync {
    public final static String EXCHANGE_NAME = "producerAsyncConfirm";

    public static void main(String[] args) throws Exception {
        // 创建连接连接到MabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        // 设置MabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();
        // 指定转发
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //1、启用发送者确认模式
        channel.confirmSelect();
        //2、添加发送者确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            //TODO 成功
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("send_ACK:"+deliveryTag+",multiple:"+multiple);
            }
            //TODO 失败
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Erro----send_NACK:"+deliveryTag+",multiple:"+multiple);
            }
        });

        // 添加失败者通知的监听器
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties,
                                     byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("RabbitMq路由失败:  "+routingKey+"."+message);
            }
        });

        String[] routekeys={"zc1","zc2"};

        for(int i=0;i<20;i++){
            String routekey = routekeys[i%2];
            // 发送的消息
            String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
            channel.basicPublish(EXCHANGE_NAME, routekey, true,
                    MessageProperties.PERSISTENT_BASIC, message.getBytes());
        }
        // 关闭频道和连接
        //channel.close();
        //connection.close();
    }
}

四、备用交换器

在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器

注意:第二节介绍了失败通知,作用就是当交换器无法路由时,向生产者反馈

而当配置了备用交换器,如果主交换器无法路由消息,RabbitMQ 并不会通知发布者,因为,向备用交换器发送消息,表示消息已经被路由了

4.1 生产者

public class BackupExProducer {

    public final static String EXCHANGE_NAME = "main-exchange";
    public final static String BAK_EXCHANGE_NAME = "bak-exchange";

    public static void main(String[] args)  throws Exception {
        // 创建连接连接到RabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();

        //1、声明备用交换器
        Map<String,Object> argsMap = new HashMap<String,Object>();
        argsMap.put("alternate-exchange",BAK_EXCHANGE_NAME);
        //主交换器
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,argsMap);
        //备用交换器
        channel.exchangeDeclare(BAK_EXCHANGE_NAME,BuiltinExchangeType.FANOUT,true,false,null);

        //所有的消息
        String[] routekeys={"zc1","zc2","zc3"};
        for(int i=0;i<3;i++){
            String routekey = routekeys[i%3];

            // 发送的消息
            String message = "Hello World_"+(i+1);
            channel.basicPublish(EXCHANGE_NAME, routekey,null, message.getBytes());
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

4.2 主消费者

public class MainConsumer {
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        // 打开连接和创建频道,与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明一个队列
        String queueName = "backupexchange";
        channel.queueDeclare(queueName,false,false,false,null);

        String routekey="zc1"; 
        channel.queueBind(queueName,BackupExProducer.EXCHANGE_NAME, routekey);

        System.out.println(" [*] Waiting for messages......");

        // 创建队列消费者
        final Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                //记录日志到文件:
                System.out.println( "Received [" + envelope.getRoutingKey() + "] "+message);
            }
        };
        channel.basicConsume(queueName, true, consumerB);
    }
}

4.3 备用交换器对应的消费者

public class BackupExConsumer {
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        // 打开连接和创建频道,与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(BackupExProducer.BAK_EXCHANGE_NAME,
                BuiltinExchangeType.FANOUT,true, false, null);
        // 声明一个队列
        String queueName = "fetchother";
        channel.queueDeclare(queueName,false,false, false,null);

        channel.queueBind(queueName,BackupExProducer.BAK_EXCHANGE_NAME, "#");

        System.out.println(" [*] Waiting for messages......");

        // 创建队列消费者
        final Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                //记录日志到文件:
                System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
            }
        };
        channel.basicConsume(queueName, true, consumerB);
    }
}

总结:

本节主要学习生产者在发布消息时可以采取的一些消息策略

1、无保障。最简单粗暴的消息策略,只管发送,不管MQ是否正常收到

2、失败确认

      实现方式:生产者在发布消息时将mandatory设为true,并设置一个回调函数

      优点:当MQ发现消息不可路由,则向生产者反馈

      缺点:向生产者反馈的消息可能会丢失

3、发送方确认

     MQ内部发生错误无法投入到队列时,则向生产者返回Nack,成功投递则返回ack

      种类:1、一般确认模式  2、批量确认模式   3、异步监听发送方确认模式 

      实现方式:生产者将信道设置为confirm模式(channel.confirmSelect();)

4、备用交换器

      当找不到路由时,可以将消息发送到备用交换器上

      在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器

      

 

 

 


版权声明:本文为quge_name_harder原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。