本人一直觉得MirrorMaker只能算是kafka基本功能模块之一(包含在kafka功能中)
本文快速的简单演示如何使用mirrormaker。
主要解决Kafka跨集群同步、创建Kafka集群镜像、topic等相关问题,主要使用Kafka内置的MirrorMaker工具实现。
虽目前市面上关于mirrormaker操作以及跨集群kafka其他迁移工具有不少,本文会系统+快速介绍mirrormaker操作使用要点。
1. mirrormaker定义
2. mirrormaker操作
3. kafka基本操作(验证数据)
4. 效果
1. mirrormaker定义
在kafka迁移操作之前,首先得搞清楚kafka-mirrormaker
MirrorMaker 是Kafka 官方提供的跨数据中心的流数据同步方案。
原理:通过从Source Cluster消费消息然后把消息生产到Target Cluster。
操作:用户只要通过简单的consumer配置和producer配置,然后启动Mirror,就可以实现准实时的数据同步
2. mirrormaker操作
首先需要配置
consumer.properties和producer.properties
consumer.properties(内容为原始集群信息)
#config/consumer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093 # source-cluster的broker list
group.id=test-consumer-group1 # 自定义一个消费者的group id
auto.offset.reset= # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
producer.properties(内容为新集群信息)
#config/producer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster2:9092,kafka-cluster2:9093 # destination-cluster的broker list
compression.type=none # 数据压缩方式none, gzip, snappy, lz4, zstd
partitioner.class= # 指定分区程序路径,默认为随机分区
request.timeout.ms= # 请求超时时间
max.block.ms= # KafkaProducer.send
and KafkaProducer.partitionsFor
阻塞时间
linger.ms= # 等待指定时间后批量发送
max.request.size= # 发送消息最大字节数
batch.size= # 单次批量处理的字节数
buffer.memory= # 指定等待发送消息的缓冲区大小
执行操作:
./kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicname
3. kafka基本操作(验证数据)
kafka对应的zookeeper地址以及bootstrapserver地址可通过/kafka/conf/中获取到
kafka1
10.0.18.165:9092
zookeeper.connect=localhost:2181
kafka2
10.0.18.46:9092
zookeeper.connect=localhost:2181
查询topic:./kafka-topics.sh --zookeeper 10.0.18.46:2181 --list
查询test topic内容:./kafka-console-consumer.sh --bootstrap-server 10.0.18.46:9092 --from-beginning --topic test
查看group:
./kafka-consumer-groups.sh --all-groups --bootstrap-server 10.0.18.46:9092 --list
查看消费者组的消费情况:
./kafka-consumer-groups.sh --bootstrap-server 10.0.18.46:9092 --describe --group consumergroup
4. 效果
可根据实际情况调试效果,包括
auto.create.topics.enable=true ,以允许自动创建topic
内部命令删除topic
./kafka-topics.sh --delete --zookeeper 10.0.18.46:2181 --topic test
删除topic无法删除干净问题:
https://blog.csdn.net/weixin_39001364/article/details/78475853?utm_source=blogxgwz7
完整文档思路(成功)-再进入zookeeper删除:https://blog.csdn.net/weixin_38846022/article/details/102736580
首先确保 delete.topic.enable=true,再去zookeeper去删除对应topic,重启节点,删除成功
offset参数
auto.offset.reset= # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
如有相同topic环境下使用mirrormaker,会把新数据累计到老数据之上。