大数据平台测试-python往kafka发送数据

最近参与新项目,kafka鉴权相关的。需要做性能测试,在这里需要大量的数据,因此需要写个脚本。在这里记录下。

连接kafka tools,非常简单,填写下面的地址和改一下名称就可以连接了

 

这里我们先用kafka tools工具创建一个topic

分区数和副本数填一下,这里我创建的topic为 :test_lqj

这里没看到数据

 

编写代码

发送10个数据测试下

 用kafka tools工具看下

 成功。

付源码:

# -*- coding: utf-8 -*-
# @Author  : Liqiju
# @Time    : 2022/8/16 15:24
# @File    : send_kafka.py
# @Software: PyCharm

from kafka import KafkaProducer
import json
import random
import datetime
import time


class Kafka(object):

    def __init__(self,topic,json_data,bootstrap_servers):
        self.topic = topic
        self.json_data = json_data
        self.bootstrap_servers = bootstrap_servers

    def send_kafka(self):
        byte_data = json.dumps(self.json_data)
        msg = byte_data.encode('utf-8')
        producer.send(self.topic, msg)

if __name__ == '__main__':
    start_time = time.time()
    # topic,这里改为自己的topic
    topic = 'test_lqj'
    # 集群地址,这里改为自己的集群地址,就是Zookeeper Host 地址
    bootstrap_servers = 'XX.XX.XXX.XX,XX.XX.XXX.XX,10.XX.XXX.23'
    # 需要运行的次数,改为自己需要的次数
    sum = 10
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    for i in range(sum):
        score = random.randint(0, 100)  # 1到100随机数
        json_data = {"id": i, "score": score}  # data
        p1 = Kafka(topic, json_data, bootstrap_servers)
        p1.send_kafka()
        print('第'+str(i+1)+'个数据发送成功', datetime.datetime.now())
    producer.close()
    end_time = time.time()
    print('共发送'+ str(sum) + '个数据,'+'共耗时'+ str(end_time-start_time)+"秒")

 


版权声明:本文为qq_29720657原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。