目录
一、kafka开发环境安装
kafka的安装需要先安装docker,如果之前没有安装过docker,可以看我的这篇文章进行安装
为了方便一次性安装kafka的开发环境,我们使用github开源的镜像
安装命令,可以直接用,不用改动任何东西
docker run --rm -it \
-p 2181:2181 -p 9092:9092 -p 3030:3030 \
-p 8081:8081 -p 8082:8082 -p 8083:8083 \
-e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev安装过程有点长,文件比较多,请耐心等待,如下是安装的服务列表
2021-07-19 07:32:06,764 INFO spawned: 'broker' with pid 173
2021-07-19 07:32:06,766 INFO spawned: 'caddy' with pid 174
2021-07-19 07:32:06,768 INFO spawned: 'connect-distributed' with pid 175
2021-07-19 07:32:06,772 INFO spawned: 'logs-to-kafka' with pid 177
2021-07-19 07:32:06,780 INFO spawned: 'rest-proxy' with pid 178
2021-07-19 07:32:06,787 INFO spawned: 'sample-data' with pid 179
2021-07-19 07:32:06,793 INFO spawned: 'schema-registry' with pid 180
2021-07-19 07:32:06,805 INFO spawned: 'smoke-tests' with pid 187
2021-07-19 07:32:06,809 INFO spawned: 'zookeeper' with pid 189安装完后,访问http://192.168.43.36:3030/可以看到如下界面,此ip是我linux的地址

当然你也可以一个一个的进行安装,下边是不错的安装文档
二、kafka相关操作
首先,我们在所有服务都启动的情况下,进入landoop/fast-data-dev容器
注意:此处我们需要重新开一个命令窗口,不要关闭当前服务的启动窗口
docker run --rm -it --net=host landoop/fast-data-dev bash1、使用命令创建topic
使用kafka-topics创建第一个topic:first_topic
kafka-topics \
--zookeeper 127.0.0.1:2181 \
--create --topic first_topic \
--partitions 3 \
--replication-factor 1创建topic,必须指定注册中心,topic的名字,创建多少个partitions,以及备份replication-factor的数量(实际中replication-factor应该为3,但我目前只在一台服务器上安装了kafka,只有一个broker,因此只能设置为1,因为replication-facto数量不能大于brokers的数量)
如果重复创建first_topic,服务会输出错误信息,从而保证topic的唯一性

2、列出所有的topic
// 同样需要带上注册中心的地址
kafka-topics --zookeeper 127.0.0.1:2181 --list3、删除topic:second_topic
kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic second_topic4、查看topic的相关信息
kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic可以看到以下相关信息,如Partition和replicationFactor的数量等
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
Topic: first_topic PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: first_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: first_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: first_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
root@fast-data-dev / $ 三、kafka-console-producer-生产消息
使用kafka-console-producer生产消息,需要指定broker和topic,命令如下
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic输入如上命令回车后,会出现输入提示,我们输入如下消息
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
>hi
>hello
>kafka
>very
>good
>producer
>然后进入kafka可视化页面,我们看到输入的message,被随机分配在了三个partition中

有时,我们需要把消息进行顺序的存储,而不是随机的分布在所有partition中,此时,我们可以设置一个key,那么相同key的消息都会被顺序的存储在一个partiton中
如下,我们加了两个属性,使用key,以及指定key和value的分隔符
kafka-console-producer --broker-list 127.0.0.1:9092 \
--topic first_topic \
--property "parse.key=true" \
--property "key.separator=:"我们来看下输入:
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 \
--topic first_topic \
--property "parse.key=true" \
--property "key.separator=:"
>3:today
>3:brocker-ket
>3:kafka-key
>3:partition-key
>然后进入kafka可视化页面,我们看到输入的message,被顺序的存储在相同partiton中

四、kafka-console-consumer-消费消息
使用kafka-console-consumer消费消息,需要指定broker和topic,命令如下
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic输入如上命令时,如果此时我们没有生产message,那么我们将看不到任何效果,因为此时的消费者默认读取从当前启动时间后生产的message,因此启动时间之前的message都将不被消费
我们此时创建一个生产者,并输入一些message,来看看效果:

如果想要消费消息队列中所有的message(包括历史消息),需要加上参数 --from-beginning
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic \
--from-beginning如果需要指定消费的partition,可以加上--partition参数
root@fast-data-dev / $ kafka-console-consumer \
> --bootstrap-server 127.0.0.1:9092 \
> --topic first_topic \
> --from-beginning \
> --partition 2
world
today
brocker-ket
kafka-key
partition-key上边,我们指定消费partition2中的消息,一共消费了5条,我们通过UI界面,也可以看到partition2中确实有5条消息

ConsumerGruop
使用ConsumerGruop,把多个消费者划分到一个组中,组的名字通过group.id进行指定
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic \
--from-beginning \
--consumer-property group.id=my-group-name通过group.id划分到组的consumer,每消费完一个message,都会提交一个committed offset,当组内的下一个consumer读数据时,会从committed offset处开始读取。因此,分组后的consumer,并不会重复消费组内中任意一个consumer已经消费完成的消息。
附:kafka中message存储的json形式
[
{
"topic": "first_topic",
"key": "3",
"value": "world",
"partition": 2,
"offset": 0
},
{
"topic": "first_topic",
"key": "3",
"value": "today",
"partition": 2,
"offset": 1
},
{
"topic": "first_topic",
"key": "3",
"value": "brocker-ket",
"partition": 2,
"offset": 2
}
]