Centos7之Kafka+ZooKeeper集群

目 录
1 ZooKeeper 2
1.1 Zookeeper概念介绍 2
1.2 ZooKeeper应用举例 4
1.3 ZooKeeper工作原理 6
1.3.1 Master启动 6
1.3.2 Master故障 7
1.3.3 Master恢复 7
1.4 ZooKeeper集群架构介绍 8
1.4.1 集群架构图 8
1.4.2 集群角色 8
1.4.3 ZooKeeper写流程 9
1.5 ZooKeeper集群模式配置 9
1.5.1 环境与角色说明 9
1.5.2 安装JDK以及设置环境变量 10
1.5.3 ZooKeeper下载及安装 11
1.5.4 配置ZooKeeper及配置参数介绍 12
1.5.5 配置zookeeper环境变量 13
1.5.6 服务启动并验证集群服务 13
1.6 ZooKeeper单机模式配置 15
1.7 案例分析及排错 15
1.8 面试题 15
2 Kafka 16
2.1 kafka基本概念 16
2.2 kafka角色术语 16
2.3 Kafka拓扑架构 17
2.4 Topic与Partition 17
2.5 Kafka消息发送的机制 18
2.6 Kafka消息消费机制 19
2.7 Kafka消息存储机制 20
2.8 安装并配置Kafka Broker集群 22
2.8.1 Kafka下载与安装 22
2.8.2 配置Kafka集群 22
2.8.3 启动Kafka集群 23
2.8.4 Kafka集群基本命令操作 24
2.8.5 注意事项 25
2.9 案例分析及排错 26
2.10 面试题 26

1 ZooKeeper
1.1 Zookeeper概念介绍
在介绍ZooKeeper之前,先来介绍一下分布式协调技术,所谓分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。
这里首先介绍一下什么是分布式系统,所谓分布式系统就是在不同地域分布的多个服务器,共同组成的一个应用系统来为用户提供服务,在分布式系统中最重要的是进程的调度,这里假设一个分布在三个地域的服务器组成的一个应用系统,在第一台服务器上挂载了一个资源,但我们又不希望多个进程同时进行访问,这个时候就需要一个协调器,来让它们有序的来访问这个资源。这个协调器就是分布式系统中经常提到的那个“锁”。

例如"进程1"在使用该资源的时候,会先去获得这把锁,"进程1"获得锁以后会对该资源保持独占,此时其它进程就无法访问该资源,"进程1"在用完该资源以后会将该锁释放掉,以便让其它进程来获得锁。由此可见,通过这个“锁”机制,就可以保证分布式系统中多个进程能够有序的访问该共享资源。这里把这个分布式环境下的这个“锁”叫作分布式锁。这个分布式锁就是分布式协调技术实现的核心内容。
综上所述,ZooKeeper是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。
1.2 ZooKeeper应用举例
这里以ZooKeeper提供的基本服务分布式锁为例进行介绍。在分布式锁服务中,有一种最典型应用场景,就是通过对集群进行Master角色的选举,来解决分布式系统中的单点故障问题。所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。
解决单点故障,传统的方式是采用一个备用节点,这个备用节点定期向主节点发送ping包,主节点收到ping包以后向备用节点发送回复Ack信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务。

这种传统解决单点故障的方法,虽然在一定程度上解决了问题,但是有一个隐患,就是网络问题,可能会存在这样一种情况:主节点并没有出现故障,只是在回复ack响应的时候网络发生了故障,这样备用节点就无法收到回复,那么它就会认为主节点出现了故障,接着,备用节点将接管主节点的服务,并成为新的主节点,此时,分布式系统中就出现了两个主节点(双Master节点)的情况,双Master节点的出现,会导致分布式系统的服务发生混乱。这样的话,整个分布式系统将变得不可用。为了防止出现这种情况,就需要引入ZooKeeper来解决这种问题。

1.3 ZooKeeper工作原理
下面通过三种情形,介绍下Zookeeper是如何进行工作的。
1.3.1 Master启动
在分布式系统中引入Zookeeper以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是"主节点A"和"主节点B",当两个主节点都启动后,它们都会向ZooKeeper中注册节点信息。我们假设"主节点A"锁注册的节点信息是"master00001",“主节点B"注册的节点信息是"master00002”,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法,那么编号最小的节点将在选举中获胜并获得锁成为主节点,也就是"主节点A"将会获得锁成为主节点,然后"主节点B"将被阻塞成为一个备用节点。这样,通过这种方式Zookeeper就完成了对两个Master进程的调度。完成了主、备节点的分配和协作。

1.3.2 Master故障
如果"主节点A"发生了故障,这时候它在ZooKeeper所注册的节点信息会被自动删除,而ZooKeeper会自动感知节点的变化,发现"主节点A"故障后,会再次发出选举,这时候"主节点B"将在选举中获胜,替代"主节点A"成为新的主节点,这样就完成了主、被节点的重新选举。

1.3.3 Master恢复
如果主节点恢复了,它会再次向ZooKeeper注册自身的节点信息,只不过这时候它注册的节点信息将会变成"master00003",而不是原来的信息。ZooKeeper会感知节点的变化再次发动选举,这时候"主节点B"在选举中会再次获胜继续担任"主节点","主节点A"会担任备用节点。
Zookeeper就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步的。

1.4 ZooKeeper集群架构介绍
1.4.1 集群架构图
Zookeeper一般是通过集群架构来提供服务的,下图是Zookeeper的基本架构图。

1.4.2 集群角色
Zookeeper集群主要角色有Server和client,其中,Server又分为Leader、Follower和Observer三个角色,每个角色的含义如下:
Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。
Follower:跟随者角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。
Observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步leader状态,但不参与投票。Observer目的是扩展系统,提高伸缩性。
Client:客户端角色,用于向Zookeeper发起请求。
Zookeeper集群中每个Server在内存中存储了一份数据,在Zookeeper启动时,将从实例中选举一个Server作为leader,Leader负责处理数据更新等操作,当且仅当大多数Server在内存中成功修改数据,才认为数据修改成功。
1.4.3 ZooKeeper写流程
Zookeeper写的流程为:客户端Client首先和一个Server或者Observe通信,发起写请求,然后Server将写请求转发给Leader, Leader再将写请求转发给其它Server,其它Server在接收到写请求后写入数据并响应Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应Client,完成一次写操作过程。
1.5 ZooKeeper集群模式配置
1.5.1 环境与角色说明
对于集群模式下的ZooKeeper部署,官方建议至少要三台服务器,关于服务器的数量,推荐是奇数个(3、5、7、9等等),以实现ZooKeeper集群的高可用。
操作系统统一采用Centos7.4版本,各个服务器角色如下表所示:

1.5.2 安装JDK以及设置环境变量
Zookeeper依赖于Java环境,下面开始介绍如何安装及配置java环境变量
1 yum install java-1.8.0-openjdk
2 java –version

3 ll /usr/lib/jvm #默认jre jdk 安装路径是/usr/lib/jvm 下面,

4 vim /etc/profile #三台服务器均配置
#set java environment
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el7_6.x86_64/jre
export CLASSPATH=.:J A V A H O M E / l i b / d t . j a r : JAVA_HOME/lib/dt.jar:JAVAHOME/lib/dt.jar:JAVA_HOME/lib/tools.jar:J A V A H O M E / j r e / l i b / r t . j a r e x p o r t P A T H = JAVA_HOME/jre/lib/rt.jar export PATH=JAVAHOME/jre/lib/rt.jarexportPATH=PATH:$JAVA_HOME/bin
5 source /etc/profile #生效配置 javac 和java 命令都有输出设置提示就表示安装和环境配置成功了

1.5.3 ZooKeeper下载及安装
ZooKeeper是用Java编写的,需要安装java运行环境(本文使用1.8),ZooKeeper官方网站:https://zookeeper.apache.org/,

1 wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
2 tar zxvf zookeeper-3.4.14.tar.gz -C /usr/local/ #解压到/usr/local目录下
3 mv /usr/local/zookeeper-3.4.14 /usr/local/zookeeper/ #重命名
4 cd /usr/local/zookeeper/conf ; cp zoo_sample.cfg zoo.cfg
1.5.4 配置ZooKeeper及配置参数介绍
ZooKeeper安装到了/usr/local目录下,因此,zookeeper的配置模板文件为/usr/loca/zookeeper/conf/zoo_sample.cfg,拷贝并重命名为zoo.cfg,
1 cd /usr/local/zookeeper/conf ; cp zoo_sample.cfg zoo.cfg
重点配置内容如下(已经省略其他配置,具体可参考官方网站的介绍https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html):
tickTime=2000
dataDir=/data/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
server.1=192.168.85.137:2888:3888
server.2=192.168.85.138:2888:3888
server.3=192.168.85.139:2888:3888

每个配置项含义如下:
tickTime:zookeeper使用的基本时间度量单位,以毫秒为单位,它用来控制心跳和超时。2000表示2 tickTime。更低的tickTime值可以更快地发现超时问题。
initLimit:这个配置项是用来配置Zookeeper集群中Follower服务器初始化连接到Leader时,最长能忍受多少个心跳时间间隔数(也就是tickTime)l
syncLimit:这个配置项标识Leader与Follower之间发送消息,请求和应答时间长度最长不能超过多少个tickTime的时间长度
dataDir:必须配置项,用于配置存

  1. List item

储快照文件的目录。需要事先创建好这个目录,如果没有配置dataLogDir,那么事务日志也会存储在此目录。
clientPort:zookeeper服务进程监听的TCP端口,默认情况下,服务端会监听2181端口。
Server.A=B:C:D:其中A是一个数字,表示这是第几个服务器;B是这个服务器的IP地址;C表示的是这个服务器与集群中的Leader服务器通信的端口;D 表示如果集群中的Leader服务器宕机了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
除了修改zoo.cfg配置文件外,集群模式下还要配置一个文件myid,这个文件需要放在dataDir配置项指定的目录下,这个文件里面只有一个数字,如果要写入1,表示第一个服务器,与zoo.cfg文本中的server.1中的1对应,以此类推,在集群的第二个服务器zoo.cfg配置文件中dataDir配置项指定的目录下创建myid文件,写入2,这个2与zoo.cfg文本中的server.2中的2对应。Zookeeper在启动时会读取这个文件,得到里面的数据与zoo.cfg里面的配置信息比较,从而判断每个zookeeper server的对应关系。
为了保证zookeeper集群配置的规范性,建议将zookeeper集群中每台服务器的安装和配置文件路径都保存一致。

1 mkdir –pv /data/zookeeper #在三台服务器上192.168.85.137/138/139
2 cd /data/zookeeper #在三台服务器上192.168.85.137/138/139
3 echo 1 > myid #192.168.85.137
echo 2 > myid #192.168.85.138
echo 3 > myid #192.168.85.139
1.5.5 配置zookeeper环境变量
每次启动zookeeper,需要切换到对应的目录下,运行zkServer.sh来进行服务的启动和关闭等操作,使用不方便。为了在任意路径都可以执行“zkServer.sh start”命令,添加环境变量的内容如下:
1 vim /etc/profile #在三台服务器上192.168.85.137/138/139
2 export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=P A T H : PATH:PATH:ZOOKEEPER_HOME/bin
#插入到文末位置
3 source /etc/profile #生效环境变量
1.5.6 服务启动并验证集群服务
1 zkServer.sh start #在三台服务器上192.168.85.137/138/139

2 zkServer.sh status #查看其状态,也可以使用ps –ef|grep zookeeper查看进程,此处有1个leader,2个follower,说明集群服务正常。

3 jps #查看java进程对应的QuorumPeerMain

Zookeeper启动后,通过jps命令(jdk内置命令)可以看到有一个QuorumPeerMain标识,这个就是Zookeeper启动的进程,前面的数字是Zookeeper进程的PID。
4 zkCli.sh -server 192.168.85.137:2181

到此,按照以上个步骤可知zookeeper集群是否正常启动。

1.6ZooKeeper单机模式配置
可参照集群方式配置,仅配置一台服务器即可,测试方法相同。

1.7 案例分析及排错
如出现错误,请查看日志文件来判断问题点,日志文件的目录如下:
/usr/local/zookeeper/conf/zookeeper.out。
待更新。。。

1.8 面试题
待更新。。。

2 Kafka
2.1 kafka基本概念
Kafka是一种高吞吐量的分布式发布/订阅消息系统,这是官方对kafka的定义,kafka是Apache组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop平台的数据分析、低时延的实时系统、storm/spark流式处理引擎等。kafka现在它已被多家大型公司作为多种类型的数据管道和消息系统使用。
2.2 kafka角色术语
在介绍架构之前,先了解下kafka中一些核心概念和各种角色。
Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker。
Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
Producer:指消息的生产者,负责发布消息到Kafka broker。
Consumer :指消息的消费者,从Kafka broker拉取数据,并消费这些已发布的消息。
Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition,每个partition都是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(称为offset)。
Consumer Group:消费者组,可以给每个Consumer指定消费者组,若不指定消费者组,则属于默认的group。
Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息。
2.3 Kafka拓扑架构
一个典型的Kafka集群包含若干Producer,若干broker、若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

2.4 Topic与Partition
Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。推荐partition的数量一定要大于同时运行的consumer的数量。另外,建议partition的数量大于集群broker的数量,这样消息数据就可以均匀的分布在各个broker中。
那么,Topic为什么要设置多个Partition呢,这是因为kafka是基于文件存储的,通过配置多个partition可以将消息内容分散存储到多个broker上,这样可以避免文件尺寸达到单机磁盘的上限。同时,将一个topic切分成任意多个partitions,可以保证消息存储、消息消费的效率,因为越多的partitions可以容纳更多的consumer,可有效提升Kafka的吞吐率。因此,将Topic切分成多个partitions的好处是可以将大量的消息分成多批数据同时写到不同节点上,将写请求分担负载到各个集群节点。
2.5 Kafka消息发送的机制
每当用户往某个Topic发送数据时,数据会被hash到不同的partition,这些partition位于不同的集群节点上,所以每个消息都会被记录一个offset消息号,就是offset号。消费者通过这个offset号去查询读取这个消息。
发送消息流程为:
首先获取topic的所有Patition,如果客户端不指定Patition,也没有指定Key的话,使用自增长的数字取余数的方式实现指定的Partition。这样Kafka将平均的向Partition中生产数据。如果想要控制发送的partition,则有两种方式,一种是指定partition,另一种就是根据Key自己写算法。实现其partition方法。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。同时,每条消息被append到partition中时,是顺序写入磁盘的,因此效率非常高,经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证。

2.6 Kafka消息消费机制
Kafka中的Producer和consumer采用的是push(推送)、pull(拉取)的模式,即Producer只是向broker push消息,consumer只是从broker pull消息,push和pull对于消息的生产和消费是异步进行的。pull模式的一个好处是Consumer可自主控制消费消息的速率,同时Consumer还可以自己控制消费消息的方式是批量的从broker拉取数据还是逐条消费数据。
当生产者将数据发布到topic时,消费者通过pull的方式,定期从服务器拉取数据,当然在pull数据的时候,服务器会告诉consumer可消费的消息offset。
消费规则:
1、不同 Consumer Group下的消费者可以消费partition中相同的消息,相同的Consumer Group下的消费者只能消费partition中不同的数据。
2、topic的partition的个数和同一个消费组的消费者个数最好一致,如果消费者个数多于partition个数,则会存在有的消费者消费不到数据。
3、服务器会记录每个consumer的在每个topic的每个partition下的消费的offset,然后每次去消费去拉取数据时,都会从上次记录的位置开始拉取数据。

2.7 Kafka消息存储机制
在存储结构上,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件,每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。
partiton命名规则为topic名称+序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
在每个partition (文件夹)中有多个大小相等的segment(段)数据文件,每个segment的大小是相同的,但是每条消息的大小可能不相同,因此segment 数据文件中消息数量不一定相等。

segment数据文件有两个部分组成,分别为index file和data file,此两个文件是一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件和数据文件。

其实Kafka最核心的思想是使用磁盘,而不是使用内存,使用磁盘操作有以下几个好处:
1、磁盘缓存由Linux系统维护,减少了程序员的不少工作。
2、磁盘顺序读写速度超过内存随机读写。
3、JVM的GC效率低,内存占用大。使用磁盘可以避免这一问题。
4、系统冷启动后,磁盘缓存依然可用。
2.8 安装并配置Kafka Broker集群
这里将kafka和zookeeper部署在一起了。另外,由于是部署集群模式的kafka,因此下面的操作需要在每个集群节点都执行一遍。
2.8.1 Kafka下载与安装
可以从kafka官网https://kafka.apache.org/downloads获取kafka安装包,这里推荐的版本是kafka_2.10-0.10.0.1.tgz,将下载下来的安装包直接解压到一个路径下即可完成kafka的安装,这里统一将kafka安装到/usr/local目录下,基本操作过程如下:
1 wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz
2 tar -zxvf kafka_2.10-0.10.0.1.tgz -C /usr/local
3 mv /usr/local/kafka_2.10-0.10.0.1 /usr/local/kafka
#这里我们将kafka安装到了/usr/local目录下
2.8.2 配置Kafka集群
这里将kafka安装到/usr/local目录下,因此,kafka的主配置文件为/usr/local/kafka/config/server.properties,这里以节点kafkazk1为例,重点介绍一些常用配置项的含义:
1 broker.id=1
listeners=PLAINTEXT://192.168.85.137:9092
log.dirs=/usr/local/kafka/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=192.168.85.137:2181,192.168.85.138:2181,192.168.85.139:2181
auto.create.topics.enable=true
delete.topic.enable=true
每个配置项含义如下:
broker.id:每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。
listeners:设置kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。
log.dirs:这个参数用于配置kafka保存数据的位置,kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个路径, kafka会根据最少被使用的原则选择目录分配新的parition。需要注意的是,kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的 parition的个数多小而定。
num.partitions:这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。这里配置6个。
log.retention.hours:这个参数用于配置kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项。这三个参数都会控制删除过期数据的时间,推荐使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。
log.segment.bytes:配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。
zookeeper.connect:这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。 这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,每个部分的含义如下:
hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。
port: 表示是zookeeper服务器监听连接的端口号。
/path:表示kafka在zookeeper上的根目录。如果不设置,会使用根目录。
auto.create.topics.enable:这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建, kafka会在broker上自动创建一个topic,如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。
delete.topic.enable:在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。
2.8.3 启动Kafka集群
在启动kafka集群前,需要确保ZooKeeper集群已经正常启动。接着,依次在kafka各个节点上执行如下命令即可:
1 cd /usr/local/kafka
2 nohup bin/kafka-server-start.sh config/server.properties &
3 jps

这里将kafka放到后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out日志文件,可通过此文件查看kafka的启动和运行状态。通过jps指令,可以看到有个Kafka标识,这是kafka进程成功启动的标志。
2.8.4 Kafka集群基本命令操作
kefka提供了多个命令用于查看、创建、修改、删除topic信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于kafka安装目录的bin目录下,这里是/usr/local/kafka/bin。登录任意一台kafka集群节点,切换到此目录下,即可进行命令操作。下面列举kafka的一些常用命令的使用方法。
(1)创建一个mykafkatopic,并指定topic属性(副本数、分区数等)
1 bin/kafka-topics.sh --create --zookeeper 192.168.85.137:2181,192.168.85.138:2181,192.168.85.139:2181 --replication-factor 1 --partitions 3 --topic mykafkatopic

#replication-factor topic的副本信息。
#partitions3 的数量和消费者有关
#topic ** 指定topic的名称

(2)显示topic列表
1 bin/kafka-topics.sh --create --zookeeper 192.168.85.137:2181,192.168.85.138:2181,192.168.85.139:2181 --replication-factor 1 --partitions 3 --topic mykafkatopic

(3)查看某个topic的状态
1 bin/kafka-topics.sh --describe --zookeeper 192.168.85.137:2181,192.168.85.138:2181,192.168.85.139:2181 --topic mytopic
(4)生产消息
1 bin/kafka-console-producer.sh --broker-list 192.168.85.137:9092,192.168.85.138:9092,192.168.85.139:9092 --topic mytopic
(5)消费消息
1 bin/kafka-console-consumer.sh --zookeeper 192.168.85.137:2181,192.168.85.138:2181,192.168.85.139:2181 --topic mytopic
#–from-beginning 表示从头到尾输出消息,之前的消息可能是乱序的。
(6)删除topic
1 bin/kafka-topics.sh --zookeeper 192.168.85.137:2181,192.168.85.138:2181,192.168.85.139:2181 --delete --topic mykafkatopic
2.8.5 注意事项
一定要先启动ZooKeeper 再启动Kafka ,顺序不可以改变;
1 1.zkServer.sh start
2.zkServer.sh status
cd /usr/local/kafka
nohup bin/kafka-server-start.sh config/server.properties &

一定要先关闭kafka,再关闭zookeeper。
1 cd /usr/local/kafka
bin/kafka-server-stop.sh
zkServer.sh stop

2.9 案例分析及排错
待更新。。。
2.10 面试题
待更新。。。


版权声明:本文为free_9527原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。