1、先进入kafka服务端脚本目录下
2、导出环境变量
3、开始生产消息,观察磁盘目录文件
sh kafka-console-producer.sh --bootstrap-server ****** --producer.config ../config/producer.properties --topic myTopicHA

日志路径该Topic的消息存储 .log文件不断更新时间和大小,说明最新生产的消息已经存储成功
4、解析该Topic消息日志文件Body
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/***********/myTopicHA-2/00000000000000000000.log --print-data-log

5、生产并消费消息,生产消息不进行加密,消费消息不进行解密
(1)生产消息

(2)配置consumer.properties

(3)消费消息,接收到消息body
sh kafka-console-consumer.sh --bootstrap-server ****** --consumer.config ../config/consumer.properties --topic myTopicHA

6、解析consume_offest
(1) 确认consume_group 在哪个__consumer_offsets-? 中: Math.abs(groupID.hashCode()) % numPartitions
(2)该脚本 kafka-console-consumer.sh ,分区和group不能同时配置,需要注释掉consumer.properties文件中的group.id
(3)将 __consumer_offests_25这个Topic的body另存到自定义文件中
sh kafka-console-consumer.sh --consumer.config ../config/consumer.properties --topic __consumer_offsets --partition 25 --bootstrap-server ****** --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" > /tmp/wht_commit

(4)查看wht_commit文件中的offest提交信息

版权声明:本文为wht_0217原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。