python 启动和关闭进程_python多进程启动和退出管理

参考文档

说明

由于python的GIL导致在处理计算密集型任务时,会比单进程处理还要慢,最好的方法是使用多进程来进行处理,每个进程处理任务的一部分。

代码

子进程启动和退出管理

import signal

from multiprocessing import Process

import os

import time

# 启动进程数设置为4

sub_process_count = 4

# 待处理任务列表

all_task_list = [x for x in range(20)]

# 子进程的SIGTERM信号处理函数

def on_sub_process_sigterm(signo, frame):

global sub_task_id

global run_flag

# 收到SIGTERM信号后修改运行标志,子进程处理函数循环将会退出,最后子进程安全退出

run_flag = False

# 子进程的处理函数

def sub_process_func(task_id):

global sub_task_id

global run_flag

sub_task_id = task_id

run_flag = True

pid = os.getpid()

print('[{}][{}] 子进程已启动'.format(sub_task_id, pid))

for task in all_task_list[task_id::sub_process_count]: #使用python的list的切片和步长属性,确保每个进程处理任务列表中的一部分

if not run_flag:

print('[{}][{}] 子进程收到退出标志 '.format(sub_task_id, pid))

break

print('[{}][{}] 子进程正在处理任务 {} '.format(sub_task_id, pid, task))

time.sleep(10)

print('[{}][{}] 子进程安全退出'.format(sub_task_id, pid))

# 主进程的SIGTERM处理函数

def on_main_process_sigterm(signo, frame):

global sub_process_list

# 给所有子进程发送SIGTERM信号

for (task_id, p) in sub_process_list:

p.terminate()

if __name__ == '__main__':

#存储所有的子进程

global sub_process_list

sub_process_list = []

# 注册SIGTERM信号捕捉函数,来管理子进程的退出

signal.signal(signal.SIGTERM, on_main_process_sigterm)

# 启动子进程

for task_id in range(sub_process_count):

p = Process(target=sub_process_func, args=(task_id,))

p.start()

sub_process_list.append( (task_id, p) )

print('所有子进程已经启动...')

print('输入下面的命令来退出所有进程:\nkill -SIGTERM {}'.format(os.getpid()))

# 等待子进程安全退出

for (task_id, p) in sub_process_list:

p.join()

print('主进程安全退出')

import signal

from multiprocessing import Process

import os

import time

# 启动进程数设置为4

sub_process_count = 4

# 待处理任务列表

all_task_list = [x for x in range(20)]

# 子进程的SIGTERM信号处理函数

def on_sub_process_sigterm(signo, frame):

global sub_task_id

global run_flag

# 收到SIGTERM信号后修改运行标志,子进程处理函数循环将会退出,最后子进程安全退出

run_flag = False

# 子进程的处理函数

def sub_process_func(task_id):

global sub_task_id

global run_flag

sub_task_id = task_id

run_flag = True

pid = os.getpid()

print('[{}][{}] 子进程已启动'.format(sub_task_id, pid))

for task in all_task_list[task_id::sub_process_count]: #使用python的list的切片和步长属性,确保每个进程处理任务列表中的一部分

if not run_flag:

print('[{}][{}] 子进程收到退出标志 '.format(sub_task_id, pid))

break

print('[{}][{}] 子进程正在处理任务 {} '.format(sub_task_id, pid, task))

time.sleep(10)

print('[{}][{}] 子进程安全退出'.format(sub_task_id, pid))

# 主进程的SIGTERM处理函数

def on_main_process_sigterm(signo, frame):

global sub_process_list

# 给所有子进程发送SIGTERM信号

for (task_id, p) in sub_process_list:

p.terminate()

if __name__ == '__main__':

#存储所有的子进程

global sub_process_list

sub_process_list = []

# 注册SIGTERM信号捕捉函数,来管理子进程的退出

signal.signal(signal.SIGTERM, on_main_process_sigterm)

# 启动子进程

for task_id in range(sub_process_count):

p = Process(target=sub_process_func, args=(task_id,))

p.start()

sub_process_list.append( (task_id, p) )

print('所有子进程已经启动...')

print('输入下面的命令来退出所有进程:\nkill -SIGTERM {}'.format(os.getpid()))

# 等待子进程安全退出

for (task_id, p) in sub_process_list:

p.join()

print('主进程安全退出')

任务队列切片说明

sub_process_count = 4

all_task_list = [x for x in range(20)]

def sub_process_func(task_id):

task_list = []

for task in all_task_list[task_id::sub_process_count]: #

print('{}: {}'.format(task_id, task))

task_list.append(task)

return task_list

if __name__ == '__main__':

all_task_list2 = []

for task_id in range(sub_process_count):

task_list = sub_process_func(task_id)

all_task_list2 += task_list

assert len(all_task_list2) == len(all_task_list) #任务数量是一致的

assert set(all_task_list2) == set(all_task_list) #任务长度也是一致的