【RocketMQ】广播消息(广播模式)

文章目录

概述

这篇文章主要是用来讲解RocketMq广播模式消费实现逻辑,核心差别在于:
1、广播模式消费位移使用本地文件存储、
2、Rebalance过程中同一个ConsumeGroup下的consumer不会进行MessageQueue的分配;而是每个consumer负责消费所有MessageQueue。

示例

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_test");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "A|B");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

consumer.setMessageModel(MessageModel.BROADCASTING); ,设置为广播模式

参考

广播模式


版权声明:本文为m0_45406092原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。