最近参与新项目,kafka鉴权相关的。需要做性能测试,在这里需要大量的数据,因此需要写个脚本。在这里记录下。
连接kafka tools,非常简单,填写下面的地址和改一下名称就可以连接了
这里我们先用kafka tools工具创建一个topic
分区数和副本数填一下,这里我创建的topic为 :test_lqj
这里没看到数据
编写代码
发送10个数据测试下
用kafka tools工具看下
成功。
付源码:
# -*- coding: utf-8 -*-
# @Author : Liqiju
# @Time : 2022/8/16 15:24
# @File : send_kafka.py
# @Software: PyCharm
from kafka import KafkaProducer
import json
import random
import datetime
import time
class Kafka(object):
def __init__(self,topic,json_data,bootstrap_servers):
self.topic = topic
self.json_data = json_data
self.bootstrap_servers = bootstrap_servers
def send_kafka(self):
byte_data = json.dumps(self.json_data)
msg = byte_data.encode('utf-8')
producer.send(self.topic, msg)
if __name__ == '__main__':
start_time = time.time()
# topic,这里改为自己的topic
topic = 'test_lqj'
# 集群地址,这里改为自己的集群地址,就是Zookeeper Host 地址
bootstrap_servers = 'XX.XX.XXX.XX,XX.XX.XXX.XX,10.XX.XXX.23'
# 需要运行的次数,改为自己需要的次数
sum = 10
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
for i in range(sum):
score = random.randint(0, 100) # 1到100随机数
json_data = {"id": i, "score": score} # data
p1 = Kafka(topic, json_data, bootstrap_servers)
p1.send_kafka()
print('第'+str(i+1)+'个数据发送成功', datetime.datetime.now())
producer.close()
end_time = time.time()
print('共发送'+ str(sum) + '个数据,'+'共耗时'+ str(end_time-start_time)+"秒")
版权声明:本文为qq_29720657原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。