概述
这篇文章主要是用来讲解RocketMq广播模式消费实现逻辑,核心差别在于:
1、广播模式消费位移使用本地文件存储、
2、Rebalance过程中同一个ConsumeGroup下的consumer不会进行MessageQueue的分配;而是每个consumer负责消费所有MessageQueue。
示例
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_test");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "A|B");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
consumer.setMessageModel(MessageModel.BROADCASTING);
,设置为广播模式
参考
版权声明:本文为m0_45406092原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。