搭建Kafka高可用集群

搭建Kafka高可用集群

此文以Kafka 2.8.0版本为例!

如未指定,下述命令在所有节点执行!

系统资源及组件规划

节点名称系统名称CPU/内存网卡磁盘IP地址OS节点角色
Kafka1kafka12C/4Gens33128G192.168.0.11CentOS7Kafka、ZooKeeper
Kafka2kafka22C/4Gens33128G192.168.0.12CentOS7Kafka、ZooKeeper
Kafka3kafka32C/4Gens33128G192.168.0.12CentOS7Kafka、ZooKeeper

二、系统软件安装与设置

1、安装基本软件

yum -y install vim lrzsz bash-completion

在这里插入图片描述

2、设置名称解析

echo 192.168.0.11 kafka1 >> /etc/hosts
echo 192.168.0.12 kafka2 >> /etc/hosts
echo 192.168.0.13 kafka3 >> /etc/hosts

在这里插入图片描述

3、设置NTP

yum -y install chrony

在这里插入图片描述

systemctl start chronyd
systemctl enable chronyd
systemctl status chronyd

在这里插入图片描述

chronyc sources

在这里插入图片描述

4、设置SELinux、防火墙

systemctl stop firewalld
systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config

在这里插入图片描述

三、搭建Kafka高可用集群

1、安装JDK

下载JDK文件:

参考地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

解压JDK安装文件:

tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /usr/local/

在这里插入图片描述

设置环境变量:

export JAVA_HOME=/usr/local/jdk1.8.0_291/
export PATH=$PATH:/usr/local/jdk1.8.0_291/bin/

在这里插入图片描述

添加环境变量至/etc/profile文件:

export JAVA_HOME=/usr/local/jdk1.8.0_291/
PATH=$PATH:/usr/local/jdk1.8.0_291/bin/

在这里插入图片描述

查看Java版本:

java -version

在这里插入图片描述

2、安装ZooKeeper

下载ZooKeeper文件:

参考地址:https://downloads.apache.org/zookeeper/stable/

在Kafka节点上(ZooKeeper节点)解压ZooKeeper安装文件:

tar -xf /root/apache-zookeeper-3.6.3-bin.tar.gz -C /usr/local/

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)设置环境变量:

export PATH=$PATH:/usr/local/apache-zookeeper-3.6.3-bin/bin/

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)添加环境变量至/etc/profile文件:

PATH=$PATH:/usr/local/apache-zookeeper-3.6.3-bin/bin/

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)创建ZooKeeper数据目录:

mkdir /usr/local/apache-zookeeper-3.6.3-bin/data/

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)创建ZooKeeper配置文件:

mv /usr/local/apache-zookeeper-3.6.3-bin/conf/zoo_sample.cfg /usr/local/apache-zookeeper-3.6.3-bin/conf/zoo.cfg

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)修改ZooKeeper配置文件:

vim /usr/local/apache-zookeeper-3.6.3-bin/conf/zoo.cfg

添加数据目录:

dataDir=/usr/local/apache-zookeeper-3.6.3-bin/data/

在这里插入图片描述

添加ZooKeeper节点:

server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888

在这里插入图片描述

ZooKeeper配置参数解读

Server.A=B:C:D。

A是一个数字,表示这个是第几号服务器;

B是这个服务器的IP地址;

C是这个服务器与集群中的Leader服务器交换信息的端口;

D是当集群中的Leader服务器故障,需要一个端口来重新进行选举,选出一个新的Leader,而端口D就是用来执行选举时服务器相互通信的端口。

集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

在Kafka1节点上(ZooKeeper节点)创建myid文件,并添加A值:

touch /usr/local/apache-zookeeper-3.6.3-bin/data/myid
echo 1 > /usr/local/apache-zookeeper-3.6.3-bin/data/myid

在这里插入图片描述

在Kafka2节点上(ZooKeeper节点)创建myid文件,并添加A值:

touch /usr/local/apache-zookeeper-3.6.3-bin/data/myid
echo 2 > /usr/local/apache-zookeeper-3.6.3-bin/data/myid

在这里插入图片描述

在Kafka3节点上(ZooKeeper节点)创建myid文件,并添加A值:

touch /usr/local/apache-zookeeper-3.6.3-bin/data/myid
echo 3 > /usr/local/apache-zookeeper-3.6.3-bin/data/myid

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)启动ZooKeeper:

zkServer.sh start

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在Kafka节点上(ZooKeeper节点)查看ZooKeeper状态:

zkServer.sh status

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、安装Kafka集群

下载Kafka文件:

参考地址:http://kafka.apache.org/downloads

解压Kafka安装文件:

tar -zxf /root/kafka_2.13-2.8.0.tgz -C /usr/local/

在这里插入图片描述

设置环境变量:

export PATH=$PATH:/usr/local/kafka_2.13-2.8.0/bin/

在这里插入图片描述

添加环境变量至/etc/profile文件:

PATH=$PATH:/usr/local/kafka_2.13-2.8.0/bin/

在这里插入图片描述

4、配置Kafka集群

在Kafka1节点上修改/usr/local/kafka_2.13-2.8.0/config/server.properties文件:

cat > /usr/local/kafka_2.13-2.8.0/config/server.properties << EOF
broker.id=0
port=9092
host.name=kafka1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.13-2.8.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

在这里插入图片描述

在Kafka2节点上修改/usr/local/kafka_2.13-2.8.0/config/server.properties文件:

cat > /usr/local/kafka_2.13-2.8.0/config/server.properties << EOF
broker.id=1
port=9092
host.name=kafka2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.13-2.8.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

在这里插入图片描述

在Kafka3节点上修改/usr/local/kafka_2.13-2.8.0/config/server.properties文件:

cat > /usr/local/kafka_2.13-2.8.0/config/server.properties << EOF
broker.id=2
port=9092
host.name=kafka3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.13-2.8.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

在这里插入图片描述

5、启动Kafka集群

在Kafka节点上启动Kafka:

kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.8.0/config/server.properties

在这里插入图片描述

查看Kafka进程:

jps

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

6、Kafka功能演示

6.1 创建Topic

方式一:

使用replica-assignment参数手动指定Topic、Partition、Replica与Kafka
Broker之间的存储映射关系

kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1 --replica-assignment 0:1,1:2,2:0

在这里插入图片描述

指定分区数为3,副本数为2,0:1,1:2,2:0均为broker.id

方式二:

使用partitions和replication-factor参数自动分配存储映射关系

kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic2 --partitions 3 --replication-factor 2

在这里插入图片描述

指定分区数为3,副本数为2

方式三:

创建Topic时指定参数

kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic3 --partitions 3 --replication-factor 2 --config cleanup.policy=compact --config retention.ms=500

在这里插入图片描述

指定分区数为3,副本数为2,指定cleanup.policy和retention参数

6.2 查看Topic

kafka-topics.sh --list --zookeeper kafka1:2181,kafka2:2181,kafka3:2181

在这里插入图片描述

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1

在这里插入图片描述

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic2

在这里插入图片描述

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic3

在这里插入图片描述

6.3 修改Topic

kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --partitions 2 --replication-factor 2

在这里插入图片描述

增加分区数:

kafka-topics.sh --alter --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --partitions 3

在这里插入图片描述

增加配置:

kafka-topics.sh --alter --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --config flush.messages=1

在这里插入图片描述

删除配置:

kafka-topics.sh --alter --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic --delete-config flush.messages

在这里插入图片描述

6.4 删除Topic

kafka-topics.sh --delete --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic

在这里插入图片描述

6.5 启动Producer

在任意Kafka节点上启动Producer:

kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic topic1

在这里插入图片描述

6.6 启动Consumer

在任意Kafka节点上启动Consumer:

kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic topic1 --from-beginning

在这里插入图片描述

7、Kafka集群故障演示

在任意Kafka节点上查看topic1状态:

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1

在这里插入图片描述

Kafka3节点故障

在任意Kafka节点上查看topic1状态:

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1

在这里插入图片描述

Kafka3节点恢复,启动ZooKeeper和Kafka

在任意Kafka节点上查看topic1状态:

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1

在这里插入图片描述

在任意Kafka节点上对Topic触发自动负载均衡:

kafka-preferred-replica-election.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181

在这里插入图片描述

kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic topic1

在这里插入图片描述


版权声明:本文为mengshicheng1992原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。