利用python进行性能测试(下)

#获取内存使用率方法。生成一个shell脚本并上传到服务器 
def getmem():
    global host
    script=
   '''
    #!/bin/bash
while true
`    do
        free|awk '/Mem/{print  '"\\"$(date +%Y-%m-%d" "%T)\\""'","$2/$3}'
        sleep 5
    done
    '''
    file_script = open('getmem.sh', 'w')
    file_script.write(script)
    file_script.close( )
    logging.info("monitor memory by shell script: getmem.sh")
    logging.info(script)
    for host in HOST:
        sftpfile(host,None,['getmem.sh'])
#测试完成后清理sar iostat,和获得cpu数据脚本进程。并加工结果文件。    
def teardownps():
    cmd = ['killall sar',
           "ps -ef|grep getmem|grep -v grep|awk '{print $2}'|xargs kill -9",
           "df -h|awk '{print $4\",\"$5\",\"$6}' > ~/diskend.$(hostname).txt",
#           'df -h > ~/disk_end$(hostname).txt',
           'killall iostat',
           'sed -i -e "s/all//g" -e "/CPU/d" -e "/^$/d"  -r -e "s/\s{2,}/,/g" -e  "s/^/$(date +%Y-%m-%d) /g" cpu*.txt',
           'sed -i -e "/^$/d" -r -e "s/\s{2,}/,/g" -e  "s/^/$(date +%Y-%m-%d) /g" net*.txt']
    for host in HOST:
        ssh2(host,cmd)

#获取结果文件
def getresult():

    global data_path
    data_path=os.path.join(os.getcwd(),"data")
    for host in HOST:
        resultfile=ssh2(host,['ls cpu*.txt mem*.txt disk*.txt io*.txt net*.txt'])
        sftpfile(host,resultfile,None)
        for f in resultfile:
            f=f.replace("\n", "")
            shutil.move(f,data_path)


#使用sar,iostat和脚本获取性能数据               
def monitor():
    getmem()
    os.environ["LC_TIME"]="POSIX"
    cmd = [
           'nohup sar 2 > ~/cpu$(hostname).txt &',
           'chmod u+x getmem.sh','nohup ~/getmem.sh >~/mem$(hostname).txt &',
           "df -k|awk '{print $4\",\"$5\",\"$6}' > ~/diskinit.$(hostname).txt",
#           'df -h > ~/disk_init$(hostname).txt',
           'nohup iostat -d -x -k 3 >~/io$(hostname).txt &',
           "nohup sar -n DEV 3 > ~/net$(hostname).txt &"]
    for host in HOST:
        ssh2(host,cmd)



def showCPU():
    global data_path,pic_path
    cpuindex=['time','user','nice','system','iowait','steal','idle']
    pattern = re.compile(r'cpu.*.txt')
    for cpudatafile in os.listdir(data_path):
        match = pattern.match(cpudatafile)
        if match:
            logging.info("handle "+cpudatafile) 
            #通过numpy.genfromtxt加载数据
            cpudata=np.genfromtxt(fname=os.path.join(data_path,cpudatafile),
                          names=cpuindex,
                          dtype='S19,float,float,float,float,float,float',
                          delimiter=',')
            logging.info(cpudata)
            pic=0  
            plt.figure(figsize=(8,12)) #指定图像大小              
            for index in cpuindex:       
                pic+=1
                if pic==1:
                    testime=cpudata[index].astype(datetime64).copy() #转换时间类型
                if pic>1:
                    logging.info("draw "+cpudatafile.split(".")[0]+" "+index+" ..................................")
                    plt.sca(plt.subplot(len(cpuindex),1,pic))
                    plt.setp(plt.xticks()[1], rotation=30, ha='right')
                    plt.subplots_adjust(left=0.08, right=0.95, wspace=0.25, hspace=1.10)
                    plt.plot(testime,cpudata[index], color='r', linewidth=1)
                    plt.xlabel("time:s")
                    plt.ylabel(index+":%")
                    plt.title("CPU used(%): "+index)
            logging.info("save the picture to "+os.path.join(pic_path,cpudatafile.split(".")[0]+".jpg"))        
            plt.savefig(os.path.join(pic_path,cpudatafile.split(".")[0]+".jpg")) #保存图像
            plt.close()   #必须close,否则会缓存到下个图像中

class testThread (threading.Thread):   #继承父类threading.Thread
    def __init__(self,testscene,runtimer,duration):
        threading.Thread.__init__(self)
        self.testscene=testscene
        self.runtimer=runtimer
        self.duration=duration

    def run(self):
        #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 
        #测试场景。把若干个测试用例加到里面组成测试场景。如果duration为None,则表示一次性并发。
        #否则为持续并发测试。持续时间duration单位为秒。
        suite = unittest.TestSuite()     
        TestResult=unittest.TestResult()
        logging.info("run...............................................")
        logging.info("running times: "+str(self.runtimer))
        logging.info("running last(seconds): "+str(self.duration)) 
        logging.info("running testscene: "+str(self.testscene))        
        for testcase in self.testscene:
            suite.addTest(urunite_test_py.Test(testcase))
        if self.duration==None:
            for timer in range(self.runtimer):
                timer=timer+1
                suite.run(TestResult)            

        else:
            startime=time.mktime(time.localtime())
            cost=0
            while cost<=self.duration:             
                for timer in range(self.runtimer):
                    timer=timer+1
                    suite.run(TestResult)
                endtime=time.mktime(time.localtime())
                cost=endtime-startime
                timer=1            

        passrate=100-len(TestResult.failures)/TestResult.testsRun*100       

        logging.info("fail the test: "+str(TestResult.failures))
        logging.info("total fail: "+str(len(TestResult.failures)))
        logging.info("total run: "+str(TestResult.testsRun))   
        logging.info("TestCases Pass Rate: "+str(passrate)+"%")


if __name__ == "__main__":
    if not os.path.exists("./picture"):
        os.mkdir("./picture")

    if not os.path.exists("./data"):
        os.mkdir("./data")
    else:
        os.rename("data","data"+time.strftime("%Y%m%d%H%M%S", time.localtime()))
        os.mkdir("./data")
    #定义测试用例,组成一个测试场景        
    testscene1=['testcase1','testcase2']
    #testscene1=['testcase3']
    testscene2=['testcase2','testcase3']
    #利用线程进行并发测试,三个参数分别表示测试场景、并发次数和持续时间,None表示一次并发
    thread1 = testThread(testscene1,40,None)
    thread2 = testThread(testscene2,100,100)

    monitor()
    thread1.start()
    thread1.join()
    teardownps()
    getresult()


    showCPU()

版权声明:本文为werder原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。