kafka安装和相关命令操作——修改中

目录

一、kafka开发环境安装

二、kafka相关操作

1、使用命令创建topic

2、列出所有的topic

3、删除topic:second_topic

4、查看topic的相关信息

三、kafka-console-producer-生产消息

四、kafka-console-consumer-消费消息


一、kafka开发环境安装

kafka的安装需要先安装docker,如果之前没有安装过docker,可以看我的这篇文章进行安装

Docker镜像、容器和数据管理

为了方便一次性安装kafka的开发环境,我们使用github开源的镜像

GitHub - lensesio/fast-data-dev: Kafka Docker for development. Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors

安装命令,可以直接用,不用改动任何东西

docker run --rm -it \
-p 2181:2181 -p 9092:9092 -p 3030:3030 \
-p 8081:8081 -p 8082:8082 -p 8083:8083 \
-e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev

安装过程有点长,文件比较多,请耐心等待,如下是安装的服务列表

2021-07-19 07:32:06,764 INFO spawned: 'broker' with pid 173
2021-07-19 07:32:06,766 INFO spawned: 'caddy' with pid 174
2021-07-19 07:32:06,768 INFO spawned: 'connect-distributed' with pid 175
2021-07-19 07:32:06,772 INFO spawned: 'logs-to-kafka' with pid 177
2021-07-19 07:32:06,780 INFO spawned: 'rest-proxy' with pid 178
2021-07-19 07:32:06,787 INFO spawned: 'sample-data' with pid 179
2021-07-19 07:32:06,793 INFO spawned: 'schema-registry' with pid 180
2021-07-19 07:32:06,805 INFO spawned: 'smoke-tests' with pid 187
2021-07-19 07:32:06,809 INFO spawned: 'zookeeper' with pid 189

安装完后,访问http://192.168.43.36:3030/可以看到如下界面,此ip是我linux的地址

 当然你也可以一个一个的进行安装,下边是不错的安装文档

Docker搭建kafka和zookeeper:http://blog.70ci.com/post/736.html

二、kafka相关操作

首先,我们在所有服务都启动的情况下,进入landoop/fast-data-dev容器

注意:此处我们需要重新开一个命令窗口,不要关闭当前服务的启动窗口

docker run --rm -it --net=host landoop/fast-data-dev bash

1、使用命令创建topic

使用kafka-topics创建第一个topic:first_topic

kafka-topics \
--zookeeper 127.0.0.1:2181 \
--create --topic first_topic \
--partitions 3 \
--replication-factor 1

创建topic,必须指定注册中心,topic的名字,创建多少个partitions,以及备份replication-factor的数量(实际中replication-factor应该为3,但我目前只在一台服务器上安装了kafka,只有一个broker,因此只能设置为1,因为replication-facto数量不能大于brokers的数量

如果重复创建first_topic,服务会输出错误信息,从而保证topic的唯一性

2、列出所有的topic

// 同样需要带上注册中心的地址
kafka-topics --zookeeper 127.0.0.1:2181 --list

3、删除topic:second_topic

kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic second_topic

4、查看topic的相关信息

kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic

可以看到以下相关信息,如Partition和replicationFactor的数量等

root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
Topic: first_topic      PartitionCount: 3       ReplicationFactor: 1    Configs: 
        Topic: first_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: first_topic      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: first_topic      Partition: 2    Leader: 0       Replicas: 0     Isr: 0
root@fast-data-dev / $ 

三、kafka-console-producer-生产消息

使用kafka-console-producer生产消息,需要指定broker和topic,命令如下

kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic

输入如上命令回车后,会出现输入提示,我们输入如下消息

root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
>hi
>hello
>kafka
>very
>good
>producer
>

然后进入kafka可视化页面,我们看到输入的message,被随机分配在了三个partition中

有时,我们需要把消息进行顺序的存储,而不是随机的分布在所有partition中,此时,我们可以设置一个key,那么相同key的消息都会被顺序的存储在一个partiton中

如下,我们加了两个属性,使用key,以及指定key和value的分隔符

kafka-console-producer --broker-list 127.0.0.1:9092 \
--topic first_topic \
--property "parse.key=true" \
--property "key.separator=:"

我们来看下输入:

root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 \
--topic first_topic \
--property "parse.key=true" \
--property "key.separator=:"
>3:today
>3:brocker-ket
>3:kafka-key
>3:partition-key
>

然后进入kafka可视化页面,我们看到输入的message,被顺序的存储在相同partiton中

四、kafka-console-consumer-消费消息

使用kafka-console-consumer消费消息,需要指定broker和topic,命令如下

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic

输入如上命令时,如果此时我们没有生产message,那么我们将看不到任何效果,因为此时的消费者默认读取从当前启动时间后生产的message,因此启动时间之前的message都将不被消费

我们此时创建一个生产者,并输入一些message,来看看效果:

如果想要消费消息队列中所有的message(包括历史消息),需要加上参数 --from-beginning

kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic \
--from-beginning

如果需要指定消费的partition,可以加上--partition参数

root@fast-data-dev / $ kafka-console-consumer \
> --bootstrap-server 127.0.0.1:9092 \
> --topic first_topic \
> --from-beginning \
> --partition 2
world
today
brocker-ket
kafka-key
partition-key

上边,我们指定消费partition2中的消息,一共消费了5条,我们通过UI界面,也可以看到partition2中确实有5条消息

ConsumerGruop

使用ConsumerGruop,把多个消费者划分到一个组中,组的名字通过group.id进行指定

kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic \
--from-beginning \
--consumer-property group.id=my-group-name

通过group.id划分到组的consumer,每消费完一个message,都会提交一个committed offset,当组内的下一个consumer读数据时,会从committed offset处开始读取。因此,分组后的consumer,并不会重复消费组内中任意一个consumer已经消费完成的消息。

附:kafka中message存储的json形式

[ 
  {
    "topic": "first_topic",
    "key": "3",
    "value": "world",
    "partition": 2,
    "offset": 0
  },
  {
    "topic": "first_topic",
    "key": "3",
    "value": "today",
    "partition": 2,
    "offset": 1
  },
  {
    "topic": "first_topic",
    "key": "3",
    "value": "brocker-ket",
    "partition": 2,
    "offset": 2
  }
]

版权声明:本文为swadian2008原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。