搭建Kafka高可用集群
此文以Kafka 2.8.0版本为例!
如未指定,下述命令在所有节点执行!
系统资源及组件规划
节点名称 | 系统名称 | CPU/内存 | 网卡 | 磁盘 | IP地址 | OS | 节点角色 |
---|---|---|---|---|---|---|---|
Kafka1 | kafka1 | 2C/4G | ens33 | 128G | 192.168.0.11 | CentOS7 | Kafka、ZooKeeper |
Kafka2 | kafka2 | 2C/4G | ens33 | 128G | 192.168.0.12 | CentOS7 | Kafka、ZooKeeper |
Kafka3 | kafka3 | 2C/4G | ens33 | 128G | 192.168.0.12 | CentOS7 | Kafka、ZooKeeper |
二、系统软件安装与设置
1、安装基本软件
yum -y install vim lrzsz bash-completion
2、设置名称解析
echo 192.168.0.11 kafka1 >> /etc/hosts
echo 192.168.0.12 kafka2 >> /etc/hosts
echo 192.168.0.13 kafka3 >> /etc/hosts
3、设置NTP
yum -y install chrony
systemctl start chronyd
systemctl enable chronyd
systemctl status chronyd
chronyc sources
4、设置SELinux、防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
三、搭建Kafka高可用集群
1、安装JDK
下载JDK文件:
参考地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
解压JDK安装文件:
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /usr/local/
设置环境变量:
export JAVA_HOME=/usr/local/jdk1.8.0_291/
export PATH=$PATH:/usr/local/jdk1.8.0_291/bin/
添加环境变量至/etc/profile文件:
export JAVA_HOME=/usr/local/jdk1.8.0_291/
PATH=$PATH:/usr/local/jdk1.8.0_291/bin/
查看Java版本:
java -version
2、安装ZooKeeper
下载ZooKeeper文件:
参考地址:https://downloads.apache.org/zookeeper/stable/
在Kafka节点上(ZooKeeper节点)解压ZooKeeper安装文件:
tar -xf /root/apache-zookeeper-3.6.3-bin.tar.gz -C /usr/local/
在Kafka节点上(ZooKeeper节点)设置环境变量:
export PATH=$PATH:/usr/local/apache-zookeeper-3.6.3-bin/bin/
在Kafka节点上(ZooKeeper节点)添加环境变量至/etc/profile文件:
PATH=$PATH:/usr/local/apache-zookeeper-3.6.3-bin/bin/
在Kafka节点上(ZooKeeper节点)创建ZooKeeper数据目录:
mkdir /usr/local/apache-zookeeper-3.6.3-bin/data/
在Kafka节点上(ZooKeeper节点)创建ZooKeeper配置文件:
mv /usr/local/apache-zookeeper-3.6.3-bin/conf/zoo_sample.cfg /usr/local/apache-zookeeper-3.6.3-bin/conf/zoo.cfg
在Kafka节点上(ZooKeeper节点)修改ZooKeeper配置文件:
vim /usr/local/apache-zookeeper-3.6.3-bin/conf/zoo.cfg
添加数据目录:
dataDir=/usr/local/apache-zookeeper-3.6.3-bin/data/
添加ZooKeeper节点:
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
ZooKeeper配置参数解读
Server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
B是这个服务器的IP地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是当集群中的Leader服务器故障,需要一个端口来重新进行选举,选出一个新的Leader,而端口D就是用来执行选举时服务器相互通信的端口。
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
在Kafka1节点上(ZooKeeper节点)创建myid文件,并添加A值:
touch /usr/local/apache-zookeeper-3.6.3-bin/data/myid
echo 1 > /usr/local/apache-zookeeper-3.6.3-bin/data/myid
在Kafka2节点上(ZooKeeper节点)创建myid文件,并添加A值:
touch /usr/local/apache-zookeeper-3.6.3-bin/data/myid
echo 2 > /usr/local/apache-zookeeper-3.6.3-bin/data/myid
在Kafka3节点上(ZooKeeper节点)创建myid文件,并添加A值:
touch /usr/local/apache-zookeeper-3.6.3-bin/data/myid
echo 3 > /usr/local/apache-zookeeper-3.6.3-bin/data/myid
在Kafka节点上(ZooKeeper节点)启动ZooKeeper:
zkServer.sh start
在Kafka节点上(ZooKeeper节点)查看ZooKeeper状态:
zkServer.sh status
3、安装Kafka集群
下载Kafka文件:
参考地址:http://kafka.apache.org/downloads
解压Kafka安装文件:
tar -zxf /root/kafka_2.13-2.8.0.tgz -C /usr/local/
设置环境变量:
export PATH=$PATH:/usr/local/kafka_2.13-2.8.0/bin/
添加环境变量至/etc/profile文件:
PATH=$PATH:/usr/local/kafka_2.13-2.8.0/bin/
4、配置Kafka集群
在Kafka1节点上修改/usr/local/kafka_2.13-2.8.0/config/server.properties文件:
cat > /usr/local/kafka_2.13-2.8.0/config/server.properties << EOF
broker.id=0
port=9092
host.name=kafka1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.13-2.8.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
在Kafka2节点上修改/usr/local/kafka_2.13-2.8.0/config/server.properties文件:
cat > /usr/local/kafka_2.13-2.8.0/config/server.properties << EOF
broker.id=1
port=9092
host.name=kafka2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.13-2.8.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
在Kafka3节点上修改/usr/local/kafka_2.13-2.8.0/config/server.properties文件:
cat > /usr/local/kafka_2.13-2.8.0/config/server.properties << EOF
broker.id=2
port=9092
host.name=kafka3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.13-2.8.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
5、启动Kafka集群
在Kafka节点上启动Kafka:
kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.8.0/config/server.properties
查看Kafka进程:
jps
6、Kafka功能演示
6.1 创建Topic
方式一:
使用replica-assignment参数手动指定Topic、Partition、Replica与Kafka
Broker之间的存储映射关系
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1 --replica-assignment 0:1,1:2,2:0
指定分区数为3,副本数为2,0:1,1:2,2:0均为broker.id
方式二:
使用partitions和replication-factor参数自动分配存储映射关系
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic2 --partitions 3 --replication-factor 2
指定分区数为3,副本数为2
方式三:
创建Topic时指定参数
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic3 --partitions 3 --replication-factor 2 --config cleanup.policy=compact --config retention.ms=500
指定分区数为3,副本数为2,指定cleanup.policy和retention参数
6.2 查看Topic
kafka-topics.sh --list --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic2
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic3
6.3 修改Topic
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --partitions 2 --replication-factor 2
增加分区数:
kafka-topics.sh --alter --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --partitions 3
增加配置:
kafka-topics.sh --alter --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --config flush.messages=1
删除配置:
kafka-topics.sh --alter --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --delete-config flush.messages
6.4 删除Topic
kafka-topics.sh --delete --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic
6.5 启动Producer
在任意Kafka节点上启动Producer:
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic topic1
6.6 启动Consumer
在任意Kafka节点上启动Consumer:
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic topic1 --from-beginning
7、Kafka集群故障演示
在任意Kafka节点上查看topic1状态:
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1
Kafka3节点故障
在任意Kafka节点上查看topic1状态:
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1
Kafka3节点恢复,启动ZooKeeper和Kafka
在任意Kafka节点上查看topic1状态:
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1
在任意Kafka节点上对Topic触发自动负载均衡:
kafka-preferred-replica-election.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1