python_异步爬虫asyncio/aiohttp

一、基础概念(阻塞/同步/异步)

  • 阻塞:指程序未得到所需计算资源时被挂起的状态,程序在等待某个操作完成期间,自身无法继续干别的事情,比如你打哈欠的时候不能呼吸,阻塞对于程序来说往往伴随着运行耗时增加与效率低下
  • 非阻塞:指程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情,比如煲汤的时候可以刷牙洗脸
  • 同步:就是相当于顺序执行
  • 异步:为了完成某个任务,不同程序单元之间无须通信协调也能完成任务,此时不相关的程序单元之间可以是异步的,无序,如不同网页的下载、保存操作是无关的
  • 多进程:利用CPU多核优势,提高执行效率
  • 协程:本质上是单进程,可以实现异步操作,我们发送一个请求之后,需要等待一定时间才能响应,但其实在这个等待的过程中,程序还可以干许多其它事情,等得到响应之后再切换回来继续处理,这样可以充分利用CPU和其它资源

二、python协程库asyncio

  • import asyncio
  • event_loop:事件循环,相当于一个无限循环,我们可以把些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法
  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态,比如running/finished等,我们可以利用这些状态获取协程对象的执行情况
  • future:代表将来执行或没有执行的任务的结果,实际上和task没有本质区别。
  • 参考文章链接
  • 总结:
    • async定义的方法返回的是无法直接执行的协程对象
    • loop对象的run_until_complete()方法才是真正启动协程执行的开始,可接收协程对象或者task对象
    • await可以将当前协程挂起,转而执行其它的协程,可接收协程对象
    • asyncio.get_event_loop()生成一个事件循环loop对象
    • asyncio.ensure_future(协程对象)生成一个task任务对象,其中task任务对象可以返回执行的状态
    • 如果返回的是一个协程对象(如async修饰的方法),那么前面就要加await
1、定义协程
  • (1)使用async定义了一个execute方法,使得该方法成为一个无法直接执行的协程对象
  • (2)调用该方法并没有执行,而是返回一个coroutine协程对象
  • (3)使用get_event_loop()方法创建一个事件循环loop
  • (4)调用loop对象的run_until_complete()方法将协程注册到事件循环loop中,然后启动,此时猜看到execute方法打印出了接收的数字
    import asyncio
    async def execute(x):
        print('Number:', x)
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    print('After calling loop')
    # Coroutine: <coroutine object execute at 0x0000000002919E48>
    # After calling execute
    # Number: 1
    # After calling loop
    
  • (5)定义task对象,调用loop对象的create_task()方法coroutine对象转化为task对象,随后我们打印输出一下,发现它是 pending 状态。接着我们将 task 对象添加到事件循环中得到执行,随后我们再打印输出一下 task 对象,发现它的状态就变成了 finished
    import asyncio
    async def execute(x):
        print('Number:', x)
        return x
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    loop = asyncio.get_event_loop()
    task = loop.create_task(coroutine)
    print('Task:', task)
    loop.run_until_complete(task)
    print('Task:', task)
    print('After calling loop')
    # Coroutine: <coroutine object execute at 0x0000000002919E48>
    # After calling execute
    # Task: <Task pending coro=<execute() running at E:/project/python基础.py:4>>
    # Number: 1
    # Task: <Task finished coro=<execute() done, defined at E:/project/python基础.py:4> result=1>
    # After calling loop
    
  • (6)定义task对象的另一种方式通过asyncio的ensure_future()方法,返回的也是task对象
    import asyncio
    async def execute(x):
        print('Number:', x)
        return x
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    task = asyncio.ensure_future(coroutine)
    print('Task:', task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    print('After calling loop')
    # Coroutine: <coroutine object execute at 0x0000000002929E48>
    # After calling execute
    # Task: <Task pending coro=<execute() running at E:/project/python基础.py:2>>
    # Number: 1
    # Task: <Task finished coro=<execute() done, defined at E:/project/python基础.py:2> result=1>
    # After calling loop
    
2、绑定回调
  • (1)调用add_done_callback()方法为某个task绑定一个回调方法。我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了;当协程对象执行完毕后,去执行声明里面的callback方法
    import asyncio
    import requests
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    def callback(task):
        print('Status:', task.result())
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    task.add_done_callback(callback)
    print('Task:', task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    # Task: <Task pending coro=<request() running at E:/project/python基础.py:4> cb=[callback() at E:/project/python基础.py:8]>
    # Status: <Response [200]>
    # Task: <Task finished coro=<request() done, defined at E:/project/python基础.py:4> result=<Response [200]>>
    
  • (2)直接调用task运行完毕之后直接调用result()方法获取结果
    import asyncio
    import requests
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    print('Task:', task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    print('Task Result:', task.result())
    # Task: <Task pending coro=<request() running at E:/project/python基础.py:3>>
    # Task: <Task finished coro=<request() done, defined at E:/project/python基础.py:3> result=<Response [200]>>
    # Task Result: <Response [200]>
    
3、多任务协程
  • (1)定义一个task列表,然后使用asyncio的wait()方法即可执行;我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来;最终5个任务顺序执行
    import asyncio
    import requests
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    print('Tasks:', tasks)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        print('Task Result:', task.result())
    # Tasks: [<Task pending coro=<request() running at E:/project/python基础.py:3>>, <Task pending coro=<request() running at E:/project/python基础.py:3>>, <Task pending coro=<request() running at E:/project/python基础.py:3>>, <Task pending coro=<request() running at E:/project/python基础.py:3>>, <Task pending coro=<request() running at E:/project/python基础.py:3>>]
    # Task Result: <Response [200]>
    # Task Result: <Response [200]>
    # Task Result: <Response [200]>
    # Task Result: <Response [200]>
    # Task Result: <Response [200]>
    
4、协程实现(未合理)
  • (1)使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。await后面可以跟一个协程对象,所以requests的的请求用async单独定义方法; 如下代码仅仅只是将涉及IO操作的代码封装到async修饰了,但并没有节省很多时间,只有使用支持异步操作的请求方法才可以实现真正的异步,比如aiohttp
    import asyncio
    import requests
    import time
    start = time.time()
    async def get(url):
        return requests.get(url)
    async def request():
        url = 'https://www.baidu.com'
        print('Waiting for', url)
        response = await get(url)
        print('Get response from', url, 'Result', response.status_code)
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print('Cost time:', end - start)
    # Waiting for https://www.baidu.com
    # Get response from https://www.baidu.com Result 200
    # Waiting for https://www.baidu.com
    # Get response from https://www.baidu.com Result 200
    # Waiting for https://www.baidu.com
    # Get response from https://www.baidu.com Result 200
    # Waiting for https://www.baidu.com
    # Get response from https://www.baidu.com Result 200
    # Waiting for https://www.baidu.com
    # Get response from https://www.baidu.com Result 200
    # Cost time: 0.45502614974975586
    

三、aiohttp的使用

1、利用aiohttp实现真正意义上的协程
  • pip install aiohttp是一个支持异步请求的库,利用它和asyncio配合我们可以非常方便的实现异步请求操作。aiohttp官方文档此次就实现了相同时间内实现多个请求的过程

    import asyncio
    import aiohttp
    import time
    start = time.time()
    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        result = await response.text()
        await session.close()
        return result
    async def request():
        url = 'http://www.newsmth.net/nForum/#!mainpage'
        print('Waiting for', url)
        result = await get(url)
        print('Get response from', url, 'Result:', result)
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print('Cost time:', end - start)
    

    在这里插入图片描述

  • 更多详细细节看这篇文章

2、aiohttp样例
  • 前面介绍的asyncio模块,内部实现了对TCP、UDP、SSL协议的异步操作,但对于http请求来说,就需要使用aiohttp实现
  • aiohttp是一个基于asyncio的异步HTTP网络模块,它提供了服务端,又提供了客户端
    import aiohttp
    import asyncio
    
    
    async def main():
        data = {'name': 'germey', 'age': 25}
        timeout = aiohttp.ClientTimeout(total=10)  # 设置超时时间
        async with aiohttp.ClientSession(timeout=timeout) as session:
            # async with session.post('https://httpbin.org/post', params=data) as response:
            # async with session.post('https://httpbin.org/post', json=data) as response:
            async with session.post('https://httpbin.org/post', data=data) as response:
                print('status:', response.status)
                print('headers:', response.headers)
                print('body:', await response.text())  # 如果返回是一个协程对象,则前面加await
                print('bytes:', await response.read())
                print('json:', await response.json())
    
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
        # asyncio.run(main())  # python3.7版本以上
    
  • asyncio的Semaphore来控制并发量,实例代码如下:
    import aiohttp
    import asyncio
    
    
    CONCURRENCY = 5
    URL = 'https://www.baidu.com'
    semaphore = asyncio.Semaphore(CONCURRENCY)
    session = None
    
    
    async def scrape_api():
        async with semaphore:
            print("scraping", URL)
            async with session.get(URL) as response:
                await asyncio.sleep(1)
                return await response.text()
    
    
    async def main():
        global session
        session = aiohttp.ClientSession()
        scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(100)]
        await asyncio.gather(*scrape_index_tasks)
    
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
        # asyncio.run(main())  # python3.7版本以上
    
    
  • 更多实战案例代码Demo

版权声明:本文为weixin_43411585原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。