
封面图片来源:沙沙野
内容概览
- 进程池
- 进程池和多进程的性能测试
- 进程池的其他机制
- 进程池的回调函数
进程池
- 如果有多少个任务,就开启多少个进程,实际上并不划算。由于计算机的 cpu 个数是非常有限的因此开启的进程数量完全和 cpu 个数成比例
- 进程池的特点:同一时间最多有多少个进程能够同时执行任务与进程池中进程的个数有关系
- 如果进程池设置了个数,那么程序只会开启四个进程运行
from multiprocessing import Pool
import os
def func(i):
print(i, os.getpid())
if __name__ == "__main__":
p = Pool(4)
for i in range(10):
# async 异步的提交任务
p.apply_async(func, args=(i,))
# 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
p.close()
# 阻塞,直到池子中的任务都执行完毕
p.join()
# 运行结果:发现即使重新运行多次,最多也就出现四个进程
0 4866
1 4867
3 4866
2 4868
4 4867
5 4866
7 4866
8 4867
6 4869
9 4868
# p.apply_async 要与 close/join 配合使用
# 保证主进程等待所有进程池中的任务执行完毕
# ret = p.apply_async(...)
# 在提交所有任务之后,从 ret 中获取结果(函数的返回值)
# 也能够让主进程等待池中任务完成
进程池和多进程的性能测试
- 起多进程的意义
- 为了更好的利用 cpu,所以程序中都是网络 IO 或文件 IO 就不适合用多进程,就好比多人聊天,如果 1000 人同时聊天,服务器不可能开启 1000 个进程池,不然挂掉
- 为了数据的隔离,如果程序中总是要数据共享,那么就不适合使用多进程
- 超过了 cpu 个数的任务数,都应该使用进程池来解决问题,而不能无限开启子进程
2. 进程池和多进程的性能测试示例
import os
import time
from multiprocessing import Process, Pool
def func(i):
print(i, os.getpid())
if __name__ == "__main__":
start = time.time()
p_lst = []
for i in range(5):
p = Process(target=func, args=(i,))
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
end = time.time()
pro_time = end - start
start = time.time()
p = Pool(4)
for i in range(5):
p.apply_async(func, args=(i,))
p.close()
p.join()
end = time.time()
pool_time = end - start
print("多进程所需时间:%s" % pro_time)
print("进程池所需时间:%s" % pool_time)
# 运行结果:
0 16738
1 16739
2 16740
3 16741
4 16742
0 16743
1 16744
2 16745
4 16743
3 16746
多进程所需时间:0.006443500518798828
进程池所需时间:0.11818265914916992
进程池的其他机制
- map() 的用法
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(0.1)
print(i,os.getpid())
if __name__ == '__main__':
p = Pool(4)
p.map(func,range(5))
# 运行结果:
0 17333
2 17335
1 17334
3 17336
4 17333
# 这里 p.map(func,range(5)) 相当于:
for i in range(5):
p.apply_async(func, args=(i,))
p.close()
p.join()
2. 进程池里面可以使用 get() 得到 func() 的返回值
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(1)
print(i,os.getpid())
return i**i
if __name__ == '__main__':
p = Pool(4)
ret_lst = []
for i in range(10):
ret = p.apply_async(func,args=(i,))
ret_lst.append(ret)
p.close()
p.join()
for ret in ret_lst:
print(ret.get())
# 运行结果:
0 17517
2 17519
1 17518
3 17520
4 17517
6 17518
5 17519
7 17520
9 17517
8 17518
1
1
4
27
256
3125
46656
823543
16777216
387420489
进程池的回调函数
- 大概思路
- 将 n 个任务交给 n 个进程去执行
- 每一个进程在执行完毕之后会有一个返回值,这个返回值会直接交给 callback 参数指定的那个函数去进行处理
- 这样的话,所有的进程 哪一个执行的最快,哪一个就可以先进性统计工作,能在最短的时间内得到结果
2. 回调函数用来保证一个任务在执行完毕之后,立刻发出 callback 回调函数中的内容 并且子进程的函数返回值作为 callback 函数的参数
import time
import random
from multiprocessing import Process,Pool
# 进程池的子进程执行这个函数的
def get(i):
time.sleep(random.random())
print('从网页获取一个网页的内容', i)
return i,'网页的内容'*i
# 主进程执行这个函数
def call_back(content):
print(content)
if __name__ == '__main__':
p = Pool(5)
ret_l = []
for i in range(10):
p.apply_async(get,args=(i,),callback=call_back)
p.close()
p.join()
# 运行结果:
从网页获取一个网页的内容 4
(4, '网页的内容网页的内容网页的内容网页的内容')
从网页获取一个网页的内容 0
(0, '')
从网页获取一个网页的内容 3
(3, '网页的内容网页的内容网页的内容')
从网页获取一个网页的内容 6
(6, '网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容')
从网页获取一个网页的内容 1
(1, '网页的内容')
从网页获取一个网页的内容 8
(8, '网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容')
从网页获取一个网页的内容 5
(5, '网页的内容网页的内容网页的内容网页的内容网页的内容')
从网页获取一个网页的内容 2
(2, '网页的内容网页的内容')
从网页获取一个网页的内容 9
(9, '网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容')
从网页获取一个网页的内容 7
(7, '网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容网页的内容')
版权声明:本文为weixin_39658759原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。