目录

一.JDK安装:
多的不说了,直接上连接去下载:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
下载好了,傻瓜式安装,双击,然后下一步。
安装好了后,打开命令行运行命令 java -version

表示安装成功,如果说找不到java命令,则需要配置环境变量:
JAVA_HOME: C:\Program Files\Java\jdk1.8.0_171 (jdk的安装路径)
Path: 在现有的值后面添加"; %JAVA_HOME%\bin"
二.zookeeper集群安装
kafka目前好像也自带zookeeper,但还是单独下载一个吧,分开使用个人总是觉得要好一点,解耦合嘛,然后呢自带默认是单机的,不是集群(虽然可以改,但是个人担心弄坏我们的kafka小宝贝啊)。下载地址:http://zookeeper.apache.org/releases.html。选择自己喜欢的版本把,不要下载source release不然你还得自己编译,反正我没搞过。
1.解压:
下载完了就找个你想放的目录解压出来吧,然后呢,想搞几个zookeeper,你就拷贝几份,重新命名,如:

2.配置:
①.修改配置文件,在conf文件夹下的zoo_sample.cfg改名为zoo.cfg
②.配置快照位置dataDir=E:/software/apache-zookeeper-3.7.0/data
③.配置端口号 clientPort=2181
④.配置集群server地址端口:
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
格式: server.A = B:C:D
A:是一个数字,表示第几号服务器
B:服务器IP地址
C:是一个端口号,用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口
D:是在leader挂掉时专门用来进行选举leader所用的端口
⑤.配置审核日志:audit.enable=true
zookeeper新版本启动的过程中,zookeeper新增的审核日志是默认关闭,所以控制台输出ZooKeeper audit is disabled,
标准的修改方式应该是在zookeeper的配置文件zoo.cfg新增一行audit.enable=true
另外两个服务的配置做相同处理,不同之处为端口与快照需要配置为自身的端口与路径。
⑥.在所有集群服务器的快照路径目录下分别新增myid文件,无后缀名,文件中填写数字,可从1到N,N为zookeeper的集群服务器数量。与cfg文件中的集群server配置key对应。
3.创建批量启动脚本:
桌面新建一个zookeeper-start.bat并编辑内容(安装自己的路径填写,title zk-server这个命令可以不要,目的只是为了给打开的命令行命名)
@echo off
start cmd /k "E: && cd E:\software\apache-zookeeper-3.7.0\bin && title zk-server0 && zkServer.cmd"
start cmd /k "E: && cd E:\software\apache-zookeeper-3.7.0-1\bin && title zk-server1 && zkServer.cmd"
start cmd /k "E: && cd E:\software\apache-zookeeper-3.7.0-2\bin && title zk-server2 && zkServer.cmd"
@echo on
然后保存,双击此文件,即可启动zookeeper集群了,如图:

三.kafka集群安装
1.下载安装:
下载kafka,下载地址:http://kafka.apache.org/downloads.html 记得下载source download,不要下载binary download,不然你得自己编译。解压kafka到自己的目录中,如:

2.配置:
进入config目录中,打开server.prooperties文件进行修改
①.设置唯一标识 broker.id=0
②.设置集群地址 broker.list=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
③.设置套接字(socket)服务器监听地址 listeners=PLAINTEXT://127.0.0.1:9092
④.配置日志文件存储位置 log.dirs=E:/software/kafka_2.12-2.8.0/logs
⑤.配置位移保留时间 offsets.retention.hours=168
⑥.配置滚动生成新的segment文件的最大时间 log.roll.hours=168
⑦.开启日志清理 log.cleaner.enable=true
⑧.zookeeper集群连接配置 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
⑨.topic删除相关配置,如果不配置,只会做标记删除 delete.topic.enable=true
3.集群其他服务器配置:
如 二.2 中配置了三个kafka集群服务器地址,故复制两个server.properties文件出来,对的,你没听错,不用复制整个kafka文件夹,只需要在config下复制几个properties文件,如:

分别修改server1与server2,
①.节点id改为 broker.id = 1与 broker.id = 2;
②.listeners监听改为listeners=PLAINTEXT://:9093 与 listeners=PLAINTEXT://:9094
③.日志文件存储位置改为:
log.dirs=E:/software/kafka_2.12-2.8.0/logs-1 与 log.dirs=E:/software/kafka_2.12-2.8.0/logs-2
其他不动,配置完成
4.批量启动脚本
新建kafka-start.bat文件,编写内容:
@echo off
start cmd /k "E: && cd E:\software\kafka_2.12-2.8.0\bin\windows && title kafka-server0 && kafka-server-start.bat ..\..\config\server.properties"
start cmd /k "E: && cd E:\software\kafka_2.12-2.8.0\bin\windows && title kafka-server1 && kafka-server-start.bat ..\..\config\server1.properties"
start cmd /k "E: && cd E:\software\kafka_2.12-2.8.0\bin\windows && title kafka-server2 && kafka-server-start.bat ..\..\config\server2.properties"
@echo on
保存文件后,双击即可启动kafka集群,如图:

四.测试kafka
打开cmd命令行工具,进入目录E:\software\kafka_2.12-2.8.0\bin\windows 或 进入目录ctrl+右键-在此处打开命令行
1.创建topic
输入命令:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test_my

--replication-factor是主题的副本数量,不要设置超过broker
--partitions是主题的分区数量
2.查看topic
kafka-topics.bat --describe --zookeeper localhost:2181 --topic test_my
第一行是所有分区的摘要,其次,每一行提供一个分区信息,因为我们只有一个分区,所以只有一行。
"leader":该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
"replicas":备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
"isr":“同步备份”的节点列表,也就是活着的节点并且正在同步leader
3.测试消息的推送与消费:
①.打开创建者Producer:
kafka-console-producer.bat --broker-list localhost:9092 --topic test_my

②.打开消费者Consumer:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

③.在创建者Producer输入想发送的消息,点击回车,在consumer端查看是否自动输出消息。
4.验证集群是否配置成功
①.使用命令查找并杀掉集群中的leader

②.再次查看topic状态
③.重新进入consumer,查看消息是否还在(别在意消息内容,此处我自己删过topic)
5.彻底删除topic
①.执行kafka删除命令,然后关闭Kafka
kafka-topics --delete --zookeeper localhost:2181 --topic test_my
如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那 么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
②.登录zookeeper客户端
打开cmd,进入目录E:\software\kafka_2.12-2.8.0\bin\windows 然后执行zkCli.cmd
Ⅰ. 查看topic信息
ls /brokers/topics
Ⅱ. 删除topic
删除所有: deleteall /brokers/topics
删除指定: deleteall /brokers/topics/test_my
③.删除kafka日志文件
删除server.properties配置的log.dirs文件夹下的以删除的topic开头的所有日志文件。
④.关闭后重新启动zookeeper
⑤.启动Kafka