Python的线程池之ThreadPoolExecutor

Python中的多线程

一段程序运行起来的本质就是有线程在负责执行我们的代码;那么多线程即是有多个线程在执行一段程序。

一般CPU计算密集型任务适合多进程,那么IO密集型任务则适合多线程;当然一个进程可拥有多个并行的线程,当中每一个线程,共享当前进程的资源。

提示:那么是不是一味的创建多个线程就能提高程序的效率?非也, 只有合理的的多线程才能发挥多核的优势;于是乎就有了线程池。

Python中的线程池

线程池可以可以解决多个线程的痛点

  • 创建合理的线程数量,重用存在的线程,减少线程创建销毁带来的开销。
  • 可有效的控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • 提供定时执行、定期执行、单线程、并发数控制等功能。

接着看Python主流的线程池ThreadPoolExecutor

线程池ThreadPoolExecutor

ThreadPoolExecutor位于python3标准库的并发包(concurrent.futures)下,特点

  1. 主线程可以获取某一个线程的状态,以及返回值。
  2. 线程同步
  3. 让多线程和多进程的编码接口一致。
  4. 简单粗暴

它的一些API使用:

# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 另一种方式
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 20会作为action()函数的参数
task = pool.submit(action, 20)
# 向线程池再提交一个task, 30会作为action()函数的参数
task2 = pool.submit(action, 30)
# 判断task代表的任务是否结束
print(task.done())
time.sleep(3)
# 判断task2代表的任务是否结束
print(task2.done())
# 查看task代表的任务返回的结果
print(task.result())
# 查看task2代表的任务返回的结果
print(task2.result())
# 关闭线程池
pool.shutdown()

接着需要考虑的就是如果提交的任务太多,处理不及时,则导致占用太多内存,通过查看源码可以清楚的看到它默认是使用的无界队列来存储未执行的任务,源码如下:


class ThreadPoolExecutor(_base.Executor):

    # Used to assign unique thread names when thread_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers=None, thread_name_prefix='',
                 initializer=None, initargs=()):
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
            initializer: A callable used to initialize worker threads.
            initargs: A tuple of arguments to pass to the initializer.
        """
        ##省略一些代码

        self._max_workers = max_workers
        self._work_queue = queue.SimpleQueue()
        self._idle_semaphore = threading.Semaphore(0)
        self._threads = set()
        self._broken = False
        self._shutdown = False
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))
        self._initializer = initializer
        self._initargs = initargs

其中,由源码可知,SimpleQueue 是不限制队列大小的

self._work_queue = queue.SimpleQueue()

截至 Python 3.6.6,concurrent.futures.ThreadPoolExecutor 中用的都是无·界队列,在生产速度大于消费速度时,可能会耗光系统资源,希望找到一种方式避免这种情况。

参考以下两种方案(参考):

方案一

继承 ThreadPoolExecutor,直接修改 _work_queue 为有界队列。

优点:简单粗暴直接。

缺点:修改了私有属性。

示例:python - ThreadPoolExecutor: how to limit the queue maxsize? - Stack Overflow

from concurrent import futures
import queue

class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):
    def __init__(self, maxsize=50, *args, **kwargs):
        super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(maxsize=maxsize)

方案二

将任务分组,每组完成后再提交下一组。

优点:无需继承,不改变私有属性;能切实有效的避免资源耗费。

缺点:组之间的任务是同步的,未充分利用线程,即使有剩余任务也有空闲线程存在。

示例:python - ThreadPoolExecutor: how to limit the queue maxsize? - Stack Overflow


版权声明:本文为weixin_37848710原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。