python3 协程 写同一个文件_python协程 -- 动态添加协程任务asyncio.run_coroutine_threadsafe...

使用asyncio.run_coroutine_threadsafe动态添加协程任务到事件循环中

一. 官方文档

根据asyncio的文档介绍,asyncio的事件循环不是线程安全的,一个event loop只能在一个线程内调度和执行任务,

并且同一时间只有一个任务在运行,当程序调用get_event_loop获取event loop时,会从一个本地的Thread Local对象获取属于当前线程的event loop

run_coroutine_threadsafe内部用到了call_soon_threadsafe:

event loop内部会维护着一个self-pipe,它由一对socketpair组成,_write_to_self的作用就是把一个信号写到self-pipe的一端.

这样一来,event loop在检测到self-pipe发生事件后,就会响应并唤醒事件循环来处理任务

asyncio.run_coroutine_threadsafe(coro, loop)

1. 此方法提交一个协程任务到循环中,loop作为参数

2. 返回Futures供查询结果

3. 当事件循环运行时, 必须在不同线程下添加协程任务到此循环中

二. 简单例子

由于运行事件循环会阻塞当前线程, 为满足动态添加协程任务的需求

故将事件循环放在次线程中运行, 在主线程中创建协程任务, 然后提交到次线程里的循环中

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @File : test_coroutine.py

# @Author: Wade Cheung

# @Date : 2018/12/9

# @Desc : 动态添加协程任务

import asyncio

from threading import Thread

async def create_task(event_loop):

i = 0

while True:

# 每秒产生一个任务, 提交到线程里的循环中, event_loop作为参数

asyncio.run_coroutine_threadsafe(production(i), event_loop)

await asyncio.sleep(1)

i += 1

async def production(i):

while True:

print("第{}个coroutine任务".format(i))

await asyncio.sleep(1)

def start_loop(loop):

# 运行事件循环, loop作为参数

asyncio.set_event_loop(loop)

loop.run_forever()

thread_loop = asyncio.new_event_loop() # 创建事件循环

run_loop_thread = Thread(target=start_loop, args=(thread_loop,)) # 新起线程运行事件循环, 防止阻塞主线程

run_loop_thread.start() # 运行线程,即运行协程事件循环

main_loop = asyncio.new_event_loop()

main_loop.run_until_complete(create_task(thread_loop)) # 主线程负责create coroutine object

参考: