kafka高可用集群搭建流程
机器配置
机器ip地址:10.16.18.213;10.16.18.214
Kafka版本信息
kafka版本:kafka_2.11-1.0.0
zookeeper版本: 使用kafka自带的zookeeper
下载地址:http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
Broker
以四个broker为例,每台机器运行两个broker,通过设置端口区分
目前没有使用zookeeper高可靠,有待深入研究~
本次测试所有broker使用10.16.18.213:2181上的zookeeper
| broker1 | broker2 | broker3 | broker4 | |
|---|---|---|---|---|
| brokerId | 1 | 2 | 11 | 12 |
| ip | 10.16.18.213 | 10.16.18.213 | 10.16.18.214 | 10.16.18.214 |
| 端口 | 9093 | 9094 | 9093 | 9094 |
| 消息存储路径 | /root/kafka/kafka_2.11-1.0.0/data1 | /root/kafka/kafka_2.11-1.0.0/data2 | /root/kafka/kafka_2.11-1.0.0/data1 | /root/kafka/kafka_2.11-1.0.0/data2 |
| 日志 | /kafka_2.11-1.0.0/logs | /kafka_2.11-1.0.0/logs | /kafka_2.11-1.0.0/logs | /kafka_2.11-1.0.0/logs |
| zookeeper | 10.16.18.213:2181 | 10.16.18.213:2181 | 10.16.18.213:2181 | 10.16.18.213:2181 |
日志
控制台输出的日志存放在kafka_2.11-1.0.0/logs下,如果启动失败,可以在这里查看失败原因。logs目录下包含以下日志文件
# kafka集群中有一台机器是控制器,那么控制器角色的日志就记录在这里
controller.log
# Kafka权限认证相应操作日志
kafka-authorizer.log
# 请求日志
kafka-request.log
# kafkaGC日志
kafkaServer-gc.log.0.current
# kafka控制台输出日志
kafkaServer.out
# 清理日志
log-cleaner.log
#kafkaAppender的appender和layout
server.log
# Kafka分区角色切换等状态转换日志
state-change.log
# zookeeper GC日志
zookeeper-gc.log.0.current
# zookeeper 控制台输出日志是
zookeeper.out
存储
kafka的消息储存路径通过config/server.properties中的log.dirs进行配置
一个topic可以分成若干个partition,partition还可以细分为segment,一个partition物理上由多个segment组成
当partition设置为4时可以在log.dirs目录下看到一个文件夹
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3
partition文件夹下保存的
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
配置
这里以broker1为例,其他broker需要修改broker.id listeners``host.name advertised.listeners log.dirs 等参数,如果配置多个zookeeper的话还需要修改zookeeper.connect
# broker ID
broker.id=1
############# Socket Server Settings #############
# 监听列表 客户端访问方式 指定主机名为0.0.0.0来绑定到所有接口。留空则绑定到默认接口上。 合法监>听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION>:// localhost:9093
listeners=PLAINTEXT://:9093
#不建议: 仅在未设置`listeners`时使用。使用`listeners`来代替。 如果设置了broker主机名,则他只会当定到这个地址。如果没设置,将绑定到所有接口。
host.name=10.16.18.213
#监听器发布到ZooKeeper供客户端使用,如果与`listeners`配置不同。在IaaS环境,这可能需要与broker绑定不通的接口。如果没有设置,将使用`listeners`的配置。与`listeners`不同的是,配置0.0.0.0元地址是无效的。
advertised.listeners=PLAINTEXT://10.16.18.213:9093
# 服务器用于从接收网络请求并发送网络响应的线程数
num.network.threads=3
# 服务器用于处理请求的线程数,可能包括磁盘I/O
num.io.threads=8
# 服务端用来处理socket连接的SO_SNDBUF缓冲大小。如果值为-1,则使用系统默认值。
socket.send.buffer.bytes=102400
# 服务端用来处理socket连接的SO_RCVBUFF缓冲大小。如果值为-1,则使用系统默认值。
socket.receive.buffer.bytes=102400
# socket请求的最大大小,这是为了防止server跑光内存,不能大于Java堆的大小。
socket.request.max.bytes=104857600
############## Log Basics ############
# 保存日志数据的目录,如果未设置将使用log.dir的配置。
log.dirs=/root/kafka/kafka_2.11-1.0.0/data1
# 每个topic默认的partitions数量
num.partitions=1
# 每个数据目录,用于启动时日志恢复和关闭时刷新的线程数
num.recovery.threads.per.data.dir=1
############ Internal Topic Settings ##########
# offset topic的副本数(设置的越大,可用性越高)。如果集群大小超过此副本数要求,内部topic创建将失败,。
offsets.topic.replication.factor=3
# 事务topic的副本数(设置的越大,可用性越高)。内部topic在集群数满足副本数之前,将会一直创建失败。
transaction.state.log.replication.factor=3
# 控制主题的最小ISR
transaction.state.log.min.isr=2
################ Log Retention Policy ################
# 日志清理策略设置为 删除 参数: compact 压缩
log.cleanup.policy=delete
#设置日志清理时间为30分钟
log.retention.minutes=30
# 单个日志段文件最大大小
log.segment.bytes=1073741824
# 日志清理器检查是否有日志符合删除的频率(以毫秒为单位)
log.retention.check.interval.ms=300000
################## Zookeeper #################
# Zookeeper主机地址 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=10.16.18.213:2181
# 与ZK server建立连接的超时时间,没有配置就使用zookeeper.session.timeout.ms
zookeeper.connection.timeout.ms=6000
######### Group Coordinator Settings ###############
# 在执行第一次重新平衡之前,group协调器将等待更多consumer加入group的时间。延迟时间越长意味着重>新平衡的工作可能越小,但是等待处理开始的时间增加。
group.initial.rebalance.delay.ms=0
Topic
参数
# 配置topic副本数量
--relication-factor 3
# 配置topic partitions 数量
--partitions 1
# 配置最小副本数,当且仅当request.required.acks参数设置为-1时,此参数才生效,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
-- config min.insync.replilcas=2
# 如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
#1 false 等待ISR中任意一个replica“活”过来,并且选它作为leader
#2 true(默认)选择第一个“活”过来的replica(并不一定是在ISR中)作为leader
--config unclean.leader.election.enable=false
常用命令
# 创建topic
bin/kafka-topics.sh --create --zookeeper 10.16.18.213:2181 --replication-factor 3 --partitions 1 --config unclean.leader.election.enable=false --config min.insync.replicas=2 --topic command_topic
# 创建一个生产者
bin/kafka-console-producer.sh --broker-list 10.16.18.213:9094 --request-required-acks -1 --topic command_topic
# 创建一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
# 查看指定 Topic 明细
bin/kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave2:2181 --topic topic_name
# 删除 Topic
bin/kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave2:2181 --topic topic_name
# 修改 Topic
bin/kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave2:2181 --topic topic_name --partitions 3
# 查询单个broker 的offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.16.18.214:9093 -topic command_topic --time -1
# 启动一个broker
./kafka-server-start.sh -daemon ../config/server2.properties
搭建流程
下载Kafka
wget http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0
配置Kafka
按照上面介绍的kafka的配置进行配置vim config/server.properties
- 需要注意一个broker有一个配置文件
# 例如213上
# broker1 的配置文件为
server1.properties
# broker2 的配置文件为
server2.properties
- 每个broker需要使用不同的
broker.id - 同一台机器上的broker需要配置不同的监听端口
# 213上的broker1
broker.id=1
PLAINTEXT://:9093
host.name=10.16.18.213
advertised.listeners=PLAINTEXT://10.16.18.213:9093
# 213上的broker2
broker.id=2
PLAINTEXT://:9094
host.name=10.16.18.213
advertised.listeners=PLAINTEXT://10.16.18.213:9094
# 214上的broker11
broker.id=11
PLAINTEXT://:9093
host.name=10.16.18.214
advertised.listeners=PLAINTEXT://10.16.18.214:9093
# 214 上的broker12
broker.id=12
PLAINTEXT://:9094
host.name=10.16.18.214
advertised.listeners=PLAINTEXT://10.16.18.214:9094
- 同一台机器上的broker需要配置不同的保存日志数据的目录,通过
log.dirs进行配置 - 配置zookeeper地址,因为这里共同使用213上的zookeeper所以配置都为
zookeeper.connect=10.16.18.213:2181
启动
# 启动213上的zookeeper
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 分别启动启动broker
# 213
./bin/kafka-server-start.sh -daemon config/server1.properties
./bin/kafka-server-start.sh -daemon config/server2.properties
# 214
./bin/kafka-server-start.sh -daemon config/server1.properties
./bin/kafka-server-start.sh -daemon config/server2.properties
# 创建topic
bin/kafka-topics.sh --create --zookeeper 10.16.18.213:2181,10.16.18.214:2181 --replication-factor 3 --partitions 1 --config unclean.leader.election.enable=false --config min.insync.replicas=2 --topic command_topic
# 创建生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
# 创建消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
可以使用Kafka Tool查看服务状态,或者使用命令查看
bin/kafka-topics.sh --describe --zookeeper 10.16.18.213:2181,10.16.18.214:2181 --topic topic_name
高可用测试流程
设置:partitions 1,replication-factor 3 min.insync.replicas=2
- 创建一个topic,向topic发送一些信息
- 杀掉当前的leader观察结果
kafka自动选择新的leader,不影响正常操作 - 再杀掉一个leader
可以查询但是不可以write
户端会返回异常:
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
参考文章
kafka中文文档:
http://kafka.apachecn.org/quickstart.html
Kafka日志清理之Log Deletion
https://blog.csdn.net/u013256816/article/details/80418297
kafka数据可靠性深度解读
https://hiddenpps.blog.csdn.net/article/details/71091774
kafka集群搭建(自带Zookeeper)
https://www.jianshu.com/p/898ad61c59fd