python3多进程写Kafka异步线程调用接口

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: 风过无言花易落
# @Date  : 2022/02/14 22:30
# @Desc  : 进程写Kafka,线程调接口

from confluent_kafka import Producer
import json
import timeit,time,os
from faker import Faker
import multiprocessing as mp
import random
import datetime
import string
import threading
import math
import requests
import logging
import logging.handlers

f = Faker(locale='zh-CN')
class logs(object):
    def __init__(self, level, logger=None):
        self.logger = logger
        self.logger = logging.getLogger(logger)
        # 设置输出的等级
        LEVELS = {'NOSET': logging.NOTSET,
                  'DEBUG': logging.DEBUG,
                  'INFO': logging.INFO,
                  'WARNING': logging.WARNING,
                  'ERROR': logging.ERROR,
                  'CRITICAL': logging.CRITICAL}
        # 创建文件目录
        logs_dir = "log"
        if os.path.exists(logs_dir) and os.path.isdir(logs_dir):
            pass
        else:
            os.mkdir(logs_dir)
        # 修改log保存位置
        timestamp = time.strftime("%Y-%m-%d", time.localtime())
        logfilename = "log-%s.log" % timestamp
        logfilepath = os.path.join(logs_dir, logfilename)
        rotatingFileHandler = logging.handlers.RotatingFileHandler(filename=logfilepath,
                                                                   maxBytes=1024 * 1024 * 50,
                                                                   backupCount=500)
        # 设置输出格式
        formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
        rotatingFileHandler.setFormatter(formatter)
        # 控制台句柄
        console = logging.StreamHandler()
        Level = LEVELS.get(level)
        console.setLevel(Level)
        console.setFormatter(formatter)
        # 添加内容到日志句柄中
        self.logger.addHandler(rotatingFileHandler)
        self.logger.addHandler(console)
        self.logger.setLevel(Level)
        # 解决重复日志问题
        self.logger.handlers = self.logger.handlers[:1]

    def info(self, message):
        self.logger.info(message)

    def debug(self, message):
        self.logger.debug(message)

    def warning(self, message):
        self.logger.warning(message)

    def error(self, message):
        self.logger.error(message)

class CreateIp(object):
    '''
    IP随机
    '''
    def __init__(self):
        
        self.ipv4_prov_prefix = {
            '10': ['192.168.169.', '192.168.239.', '192.168.135.'],
            '11': ['192.168.197.', '192.168.243.', '192.168.128.', '192.168.165.'],
            '13': ['192.168.212.', '192.168.132.', '192.168.166.'],
            '17': ['192.168.229.', '112.224.240.', '192.168.142.', '192.168.177.', '192.168.198.', '192.168.200.'],
            '18': ['192.168.217.', '192.168.238.', '192.168.133.', '192.168.167.', '192.168.201.', '192.168.202.'],
            '19': ['192.168.168.', '192.168.242.', '192.168.241.', '192.168.134.'],
            '30': ['192.168.178.', '192.168.244.', '192.168.143.'],
            '31': ['192.168.222.', '192.168.237.', '192.168.129.', '192.168.1168.', '192.168.221.'],
            '34': ['192.168.176.', '192.168.245.', '192.168.246.', '192.168.232.', '192.168.131.'],
            '36': ['192.168.168.', '192.168.144.', '192.168.180.'],
            '38': ['192.168.226.', '192.168.145.', '192.168.181.', '192.168.182.'],
            '50': ['192.168.185.', '192.168.147.'],
            '51': ['192.168.216.', '192.168.130.', '192.168.184.', '192.168.199.', '192.168.203.', '192.168.204.',
                   '192.168.205.', '192.168.214.', '192.168.215.'],
            '59': ['192.168.186.', '192.168.148.'],
            '70': ['192.168.194.', '192.168.156.'],
            '71': ['192.168.173.', '192.168.139.'],
            '74': ['192.168.219.', '192.168.140.', '192.168.174.', '192.168.218.'],
            '75': ['192.168.183.', '192.168.146.'],
            '76': ['192.168.234.', '192.168.141.', '192.168.175.', '192.168.207.', '192.168.208.', '192.168.223.'],
            '168': ['192.168.191.', '192.168.153.'],
            '81': ['192.168.231.', '192.168.149.', '192.168.187.', '192.168.209.', '192.168.230.'],
            '83': ['192.168.211.', '192.168.150.', '192.168.188.', '192.168.210.'],
            '84': ['192.168.192.', '192.168.154.'],
            '85': ['192.168.228.', '192.168.151.', '192.168.189.', '192.168.227.'],
            '86': ['192.168.225.', '192.168.152.', '192.168.190.', '192.168.224.'],
            '87': ['192.168.193.', '192.168.236.', '192.168.235.', '192.168.155.'],
            '88': ['192.168.195.', '192.168.157.'],
            '89': ['192.168.213.', '192.168.233.', '192.168.158.', '192.168.196.'],
            '90': ['192.168.171.', '192.168.137.'],
            '91': ['192.168.192.', '192.168.240.', '192.168.136.', '192.168.170.'],
            '97': ['192.168.172.', '192.168.138.']
            }

    def create_ipv4(self,roam_type, prov):
        if roam_type == 0:
            ipv4_add = random.choice(self.ipv4_prov_prefix[prov]) + str(random.randint(1, 125))
        elif roam_type == 1:
            del self.ipv4_prov_prefix[prov]
            new_prov = str(random.sample(self.ipv4_prov_prefix.keys(),1)[0])
            ipv4_add = random.choice(self.ipv4_prov_prefix[new_prov]) + str(random.randint(1, 125))
        return ipv4_add

def getRandomString(number):
    '''
    随机字符串
    '''
    rule = string.ascii_letters + string.digits
    str = random.sample(rule, number)
    return "".join(str)

def randomtimes(start, end, n, frmt="%Y-%m-%d %H:%M:%S"):
    '''
    随机时间区间
    '''
    stime = datetime.datetime.strptime(start, frmt)
    etime = datetime.datetime.strptime(end, frmt)
    time_datetime=[random.random() * (etime - stime) + stime for _ in range(n)]
    time_str=[t.strftime(frmt) for t in time_datetime]
    return time_str[0]

def create_phone():
    '''
    随机手机号
    '''
    # 第二位数字
    second = [3, 4, 5, 7, 8][random.randint(1, 4)]
    #第三位数字
    third = {0: random.randint(0, 9),
              4: [5, 7, 9][random.randint(0, 2)],
              5: [i for i in range(10) if i != 4][random.randint(0, 8)],
              7: [i for i in range(10) if i not in [4, 9]][random.randint(0, 7)],
              8: random.randint(0, 9), }[second]
    # 最后八位数字
    suffix = random.randint(9999999, 100000000)
    # 拼接手机号
    return "1{}{}{}".format(second, third, suffix)

# 这里的参数包括一个基准点,和一个距离基准点的距离
def generate_random_gps(base_log=None, base_lat=None, radius=None):
    '''
    # 随机经纬度
    # 这里的参数包括一个基准点,和一个距离基准点的距离
    '''
    if base_log == None or base_lat == None:
        base_log = 136.55491
        base_lat = 49.919034
        radius = 1000000
    elif radius == None:
        print('距离半径不可为空')
    radius_in_degrees = radius / 111300
    u = float(random.uniform(0.0, 1.0))
    v = float(random.uniform(0.0, 1.0))
    w = radius_in_degrees * math.sqrt(u)
    t = 2 * math.pi * v
    x = w * math.cos(t)
    y = w * math.sin(t)
    longitude = y + base_log
    latitude = x + base_lat
    # 这里是想保留6位小数点
    loga = '%.6f' % longitude
    lata = '%.6f' % latitude
    return loga, lata

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        #print('Message delivery failed: {}'.format(err))
        with open('failed.log','a') as fobj:
            fobj.write(str(err)+'\n')
    else:
        #print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
        with open('Message_delivery.log','a') as fobj:
            fobj.write('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())+'\n')

def c_kafka(*args, **kwds):
    processId = os.getpid()
    print('Process', processId)
    topic,pool_data,sleep_time,count,lock,log_grade = args
    log = logs(log_grade, __name__)
    starttotal = timeit.default_timer()
    try:
        p = Producer(kwds)
    except Exception as e:
        log.error('Process- {} -kafka:连接失败-{}'.format(processId,e))
    for i in range(count):
        start = timeit.default_timer()
        nowtime = time.time()
        dateArray = datetime.datetime.fromtimestamp(nowtime)
        nowtimeapi = dateArray.strftime("%Y-%m-%d %H:%M:%S")
        ip = CreateIp()
        prov = random.sample(
                ['10', '11', '13', '17', '18', '19', '30', '31', '34', '36', '38', '50', '51', '59', '70', '71', '74',
                 '75', '76', '79', '81', '83', '84', '85', '86', '87', '88', '89', '90', '91', '97'], 1)[0]
        roam_type = random.randint(0, 1)
        cdr = {
                "titleName": "01",
                "mobile": create_phone(),
                "provinceCode": "013",
                "cityCode": "130",
                "netType": "112_3001",
                "loginTime": nowtime,
                "loginType": "01",
                "loginState": "01",
                "imei": '0'+create_phone(),
                "userIp": ip.create_ipv4(roam_type, prov),
                "appid": "ppp111",
                "iccid": "ppp111",
                "imsi": "ppp111",
                "mac": "F4:BF:80:0E:25:6F",
                "meid": "ppp111",
                "lat": generate_random_gps()[1],
                "lon": generate_random_gps()[0],
                "deviceBrand": "HUAWEI",
                "deviceModel": "HMA-AL00",
                "os": "android",
                "osVersion": "android8.2biiiopp",
                "screen": "2244*1080",
                "memorySpace": "1.61 GB",
                "phoneSpace": "52.92 GB",
                "version": "android@8.2buuupouu"
            }
        api_data = {
                    "UNI_BSS_HEAD": {
                        "APP_ID": "tyfkpAPPID",
                        "TIMESTAMP": "{}.429".format(nowtimeapi),
                        "TRANS_ID": "2021811061409363562951",
                        "TOKEN": "6dc6f60246bf79cfc2c513fea5194402"
                    },
                    "UNI_BSS_BODY": {
                        "LOGIN_USER_RISK_CONTROL_REQ": {
                            "USER_ID": cdr["mobile"],
                            "RISK_CONTROL_CODE": "PloyEventIdCE001",
                            "HANDLE_TIME": nowtimeapi,
                            "USER_IP": cdr["userIp"],
                            "LON": cdr["lon"],
                            "LAT": cdr["lat"],
                            "IMEI":cdr["imei"]
                        }
                    }
                }
        # json_cdr = json.dumps(cdr,indent = 4)
        json_api = json.dumps(api_data, indent=4)
        log.info('Process- {} -Time consuming data construction:{}'.format(processId,timeit.default_timer() - start))
        p.poll(0)
        p.produce(topic, json.dumps(cdr).encode('utf-8'), callback=delivery_report)
        putkafkatime = timeit.default_timer()
        json_api = {
            'key': [putkafkatime, json_api]
        }
        if sleep_time > 0:
            p.flush()
            pool_data.put_nowait(json_api)
            time.sleep(int(sleep_time))
        else:
            p.flush()
            pool_data.put_nowait(json_api)
    end = timeit.default_timer() - starttotal
    return [processId, end, count]

class Creat_Thread(threading.Thread):
    def __init__(self, t_msg, pool_data, lock, url, sleeptime,threadsleeptime,log_grade):
        threading.Thread.__init__(self)
        self.t_msg = t_msg
        self.pool_data = pool_data
        self.lock = lock
        self.url = url
        self.sleeptime = sleeptime
        self.threadsleeptime = threadsleeptime
        self.log = logs(log_grade, __name__)
    def run(self):
        start = timeit.default_timer()
        count = 0
        while True:
            count += 1
            if self.pool_data.empty():
                if count <= self.threadsleeptime:
                    self.log.warning('Thread- {} - Wait for 1 second when the queue is empty'.format(self.t_msg))
                    time.sleep(1)
                    continue
                else:
                    self.log.error('Thread- {} - All items have been taken off the queue'.format(self.t_msg))
                    break
            else:
                try:
                    dataFromQueue = self.pool_data.get_nowait()
                except Exception as e:
                    pass
                else:
                    end1 = timeit.default_timer()
                    if (end1 - dataFromQueue['key'][0]) >= self.sleeptime:
                        self.log.warning('Thread- {} - Call interval:{}'.format(self.t_msg,(end - dataFromQueue['key'][0])))
                        headers = {
                            'Content-Type': 'application/json',
                            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0',
                            'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'
                        }
                        end = timeit.default_timer()
                        try:
                            rsp = requests.post(url=self.url, data=dataFromQueue['key'][1], headers=headers, verify=False,timeout=10)  # 请求
                        except Exception as e:
                            self.log.info('Thread- {} - Interface call failed:{}'.format(self.t_msg,e))
                        self.log.info('Thread- {} - Asynchronous call time consuming:{}'.format(self.t_msg, (
                                    timeit.default_timer() - end)))
                        rsp_json = json.dumps(rsp.json(), indent=4, ensure_ascii=False)  # 响应文本json化
                        # print(dataFromQueue['key'][1])
                        print(rsp_json)
                        response = {
                            'msg':dataFromQueue['key'][1],
                            'rsp':rsp.json()
                        }
                        with open('response', 'a') as fobj:
                                fobj.write(json.dumps(response, indent=4) + '\n')

                    else:
                        self.pool_data.put_nowait(dataFromQueue)
                        self.log.warning('Thread- {} - Rewrite queue'.format(self.t_msg))
        self.log.error('Thread- {} - Total asynchronous call time:{}'.format(self.t_msg,timeit.default_timer() - start))

def thread_run(thread_num,pool_data,lock,url,sleeptime,threadsleeptime,log_grade):
    t_msg = 0
    threads = []
    for tName in range(thread_num):
        t_msg += 1
        thread = Creat_Thread(t_msg,pool_data,lock,url,sleeptime,threadsleeptime,log_grade)
        thread.start()
        threads.append(thread)

if __name__ == "__main__":
    fake = Faker(locale='zh_CN')
    print('父进程', os.getpid())
    print('Resource preparation in progress')
    # -------------------------------日志级别-----------------------------------#
    log_grade = 'DEBUG'

    # -------------------------------进程配置-----------------------------------#
    count_ms = 200  # 消息数
    processes = 10  # 进程数 至少为2
    count = int(count_ms / (processes - 1))
    remainder = count_ms % (processes - 1)
    manager = mp.Manager()
    pool_data = manager.Queue()
    lock = manager.Lock()  # 初始化一把锁

    # -------------------------------kafka配置-----------------------------------#
    sleep_time = 0  # 推Kafka休眠
    topic = 'topicname'
    conf = {
        "bootstrap.servers": "192.18.0.82:3007",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms": "SCRAM-SHA-256",
        "sasl.username": "test",
        "sasl.password": "text",
        'queue.buffering.max.kbytes': 2000000,
        'queue.buffering.max.messages': 1000000
    }

    #-----------------------------接口线程调用配置--------------------------------#
    thread_num = 100  # 调用API资源线程数
    url = 'http://192.168.1.1:5555/' #生产
    sleeptime = 5 #设置 等待调用接口时间
    threadsleeptime = 10 #线程等待时间
    # 创建新线程
    threads = []
    t_msg = 0  # 自定义线程号 Python的太难获取
    #--------------------------------------------------------------------------#
    # 创建进程
    result = []
    pool = mp.Pool(processes=processes)  # processes_num 进程池数
    for p_name in range(processes-1):
        if (p_name+1) == processes-1 and remainder != 0:
            count += remainder
            result.append(pool.apply_async(func=c_kafka, args=(topic, pool_data, sleep_time,count,lock,log_grade),kwds={**conf}))
        else:
            result.append(pool.apply_async(func=c_kafka, args=(topic, pool_data, sleep_time,count,lock,log_grade), kwds={**conf}))
    result.append(pool.apply_async(func=thread_run, args=(thread_num,pool_data,lock,url,sleeptime,threadsleeptime,log_grade),))
    pool.close()
    pool.join()
    for res in range(len(result)-1):
        process, rtime ,count = result[res].get()
        print("process(%s) done. --Running time: %s Seconds" % (process, rtime),count)


版权声明:本文为qq_26679243原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。