Python异步模块asyncio和aiohttp

异步 IO,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 yield from 或 await)

异步 IO,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 yield from 或 await)

event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别。
async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。


一直对asyncio这个库比较感兴趣,毕竟这是官网也非常推荐的一个实现高并发的一个模块,python也是在python 3.4中引入了协程的概念。也通过这次整理更加深刻理解这个模块的使用

asyncio 是干什么的?

  • 异步网络操作
  • 并发
  • 协程

python3.0时代,标准库里的异步网络模块:select(非常底层) python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio:支持TCP,子进程

现在的asyncio,有了很多的模块已经在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 这里列出了已经支持的内容,并在持续更新

当然到目前为止实现协程的不仅仅只有asyncio,tornado和gevent都实现了类似功能

关于asyncio的一些关键字的说明:

  • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态
  • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别
  • async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

看了上面这些关键字,你可能扭头就走了,其实一开始了解和研究asyncio这个模块有种抵触,自己也不知道为啥,这也导致很长一段时间,这个模块自己也基本就没有关注和使用,但是随着工作上用python遇到各种性能问题的时候,自己告诉自己还是要好好学习学习这个模块。

定义一个协程

import time
import asyncio


now = lambda : time.time()


async def do_some_work(x):
    print("waiting:", x)

start = now()
# 这里是一个协程对象,这个时候do_some_work函数并没有执行
coroutine = do_some_work(2)
print(coroutine)
#  创建一个事件loop
loop = asyncio.get_event_loop()
# 将协程加入到事件循环loop
loop.run_until_complete(coroutine)

print("Time:",now()-start)

在上面带中我们通过async关键字定义一个协程(coroutine),当然协程不能直接运行,需要将协程加入到事件循环loop中

asyncio.get_event_loop:创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环

创建一个task

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象. task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果

import asyncio
import time


now = lambda: time.time()


async def do_some_work(x):
    print("waiting:", x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("Time:",now()-start)

结果为:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
waiting: 2
<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
Time: 0.0003514289855957031

创建task后,在task加入事件循环之前为pending状态,当完成后,状态为finished

关于上面通过loop.create_task(coroutine)创建task,同样的可以通过 asyncio.ensure_future(coroutine)创建task

关于这两个命令的官网解释: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

asyncio.ensure_future(coro_or_future, *, loop=None)¶
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

绑定回调

绑定回调,在task执行完成的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。

import time
import asyncio


now = lambda : time.time()


async def do_some_work(x):
    print("waiting:",x)
    return "Done after {}s".format(x)


def callback(future):
    print("callback:",future.result())


start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)

print("Time:", now()-start)

结果为:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
Time: 0.00039196014404296875

通过add_done_callback方法给task任务添加回调函数,当task(也可以说是coroutine)执行完成的时候,就会调用回调函数。并通过参数future获取协程执行的结果。这里我们创建 的task和回调里的future对象实际上是同一个对象

阻塞和await

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

import asyncio
import time

now = lambda :time.time()

async def do_some_work(x):
    print("waiting:",x)
    # await 后面就是调用耗时的操作
    await asyncio.sleep(x)
    return "Done after {}s".format(x)


start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print("Task ret:", task.result())
print("Time:", now() - start)

在await asyncio.sleep(x),因为这里sleep了,模拟了阻塞或者耗时操作,这个时候就会让出控制权。 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。

并发和并行

并发指的是同时具有多个活动的系统

并行值得是用并发来使一个系统运行的更快。并行可以在操作系统的多个抽象层次进行运用

所以并发通常是指有多个任务需要同时进行,并行则是同一个时刻有多个任务执行

下面这个例子非常形象:

并发情况下是一个老师在同一时间段辅助不同的人功课。并行则是好几个老师分别同时辅助多个学生功课。简而言之就是一个人同时吃三个馒头还是三个人同时分别吃一个的情况,吃一个馒头算一个任务

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print("Task ret:",task.result())

print("Time:",now()-start)

运行结果:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004154920578003

总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s。此时我们使用了aysncio实现了并发。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

关于asyncio.gather和asyncio.wait官网的说明:

https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

    # results = await asyncio.gather(*tasks)
    # for result in results:
    #     print("Task ret:",result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

如果我们把上面代码中的:

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

替换为:

    results = await asyncio.gather(*tasks)
    for result in results:
        print("Task ret:",result)

这样得到的就是一个结果的列表

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。 将上述的代码更改为:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
    print("Task ret:",result)

print("Time:", now()-start)

或者返回使用asyncio.wait方式挂起协程。

将代码更改为:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done,pending = loop.run_until_complete(main())
for task in done:
    print("Task ret:",task.result())

print("Time:", now()-start)

也可以使用asyncio的as_completed方法

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

从上面也可以看出,协程的调用和组合非常灵活,主要体现在对于结果的处理:如何返回,如何挂起

协程的停止

future对象有几个状态:

  • Pending
  • Running
  • Done
  • Cacelled

创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("Time:",now()-start)

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True表示cannel成功,loop stop之后还需要再次开启事件循环,最后在close,不然还会抛出异常

循环task,逐个cancel是一种方案,可是正如上面我们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可。

不同线程的事件循环

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

import asyncio
from threading import Thread
import time

now = lambda :time.time()

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法, 后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3

新线程协程

import asyncio
import time
from threading import Thread

now = lambda :time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。 主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。


python异步编程之asyncio(百万并发)

前言:python由于GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病。然而在IO密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了python性能方面的短板,如最新的微服务框架japronto,resquests per second可达百万级。

python还有一个优势是库(第三方库)极为丰富,运用十分方便。asyncio是python3.4版本引入到标准库,python2x没有加这个库,毕竟python3x才是未来啊,哈哈!python3.5又加入了async/await特性。

在学习asyncio之前,我们先来理清楚同步/异步的概念

·同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。

·异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

一、asyncio

下面通过举例来对比同步代码和异步代码编写方面的差异,其次看下两者性能上的差距,我们使用sleep(1)模拟耗时1秒的io操作。

 ·同步代码

import time

def hello():
    time.sleep(1)

def run():
    for i in range(5):
        hello()
        print('Hello World:%s' % time.time())  # 任何伟大的代码都是从Hello World 开始的!
if __name__ == '__main__':
    run()<br>

输出:(间隔约是1s)

Hello World:1527595175.4728756
Hello World:1527595176.473001
Hello World:1527595177.473494
Hello World:1527595178.4739306
Hello World:1527595179.474482

 ·异步代码

import time
import asyncio

# 定义异步函数
async def hello():
    asyncio.sleep(1)
    print('Hello World:%s' % time.time())

def run():
    for i in range(5):
        loop.run_until_complete(hello())

loop = asyncio.get_event_loop()
if __name__ =='__main__':
    run()<br>

 输出:

Hello World:1527595104.8338501
Hello World:1527595104.8338501
Hello World:1527595104.8338501
Hello World:1527595104.8338501
Hello World:1527595104.8338501
async def 用来定义异步函数,其内部有异步操作。每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。

二、aiohttp

  如果需要并发http请求怎么办呢,通常是用requests,但requests是同步的库,如果想异步的话需要引入aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个session对象,然后用session对象去打开网页。session可以进行多项操作,比如post, get, put, head等。

基本用法:

async with ClientSession() as session:
    async with session.get(url) as response:

aiohttp异步实现的例子:

import asyncio
from aiohttp import ClientSession


tasks = []
url = "https://www.baidu.com/{}"
async def hello(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            response = await response.read()
            print(response)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(hello(url))

首先async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,response.read()等待request响应,是个耗IO操作。然后使用ClientSession类发起http请求。

多链接异步访问

如果我们需要请求多个URL该怎么办呢,同步的做法访问多个URL只需要加个for循环就可以了。但异步的实现方式并没那么容易,在之前的基础上需要将hello()包装在asyncio的Future对象中,然后将Future对象列表作为任务传递给事件循环

import time
import asyncio
from aiohttp import ClientSession

tasks = []
url = "https://www.baidu.com/{}"
async def hello(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            response = await response.read()
#            print(response)
            print('Hello World:%s' % time.time())

def run():
    for i in range(5):
        task = asyncio.ensure_future(hello(url.format(i)))
        tasks.append(task)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    run()
    loop.run_until_complete(asyncio.wait(tasks))<br>

 输出:

Hello World:1527754874.8915546
Hello World:1527754874.899039
Hello World:1527754874.90004
Hello World:1527754874.9095392
Hello World:1527754874.9190395

 收集http响应

好了,上面介绍了访问不同链接的异步实现方式,但是我们只是发出了请求,如果要把响应一一收集到一个列表中,最后保存到本地或者打印出来要怎么实现呢,可通过asyncio.gather(*tasks)将响应全部收集起来,具体通过下面实例来演示。

import time
import asyncio
from aiohttp import ClientSession

tasks = []
url = "https://www.baidu.com/{}"
async def hello(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
#            print(response)
            print('Hello World:%s' % time.time())
            return await response.read()

def run():
    for i in range(5):
        task = asyncio.ensure_future(hello(url.format(i)))
        tasks.append(task)
    result = loop.run_until_complete(asyncio.gather(*tasks))
    print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    run()

 输出:

Hello World:1527765369.0785167
Hello World:1527765369.0845182
Hello World:1527765369.0910277
Hello World:1527765369.0920424
Hello World:1527765369.097017
[b'<!DOCTYPE html>\r\n<!--STATUS OK-->\r\n<html>\r\n<head>\r\n......

异常解决

假如你的并发达到2000个,程序会报错:ValueError: too many file descriptors in select()。报错的原因字面上看是 Python 调取的 select 对打开的文件有最大数量的限制,这个其实是操作系统的限制,linux打开文件的最大数默认是1024,windows默认是509,超过了这个值,程序就开始报错。这里我们有三种方法解决这个问题:

1.限制并发数量。(一次不要塞那么多任务,或者限制最大并发数量)

2.使用回调的方式

3.修改操作系统打开文件数的最大限制,在系统里有个配置文件可以修改默认值,具体步骤不再说明了。

不修改系统默认配置的话,个人推荐限制并发数的方法,设置并发数为500,处理速度更快。

#coding:utf-8
import time,asyncio,aiohttp


url = 'https://www.baidu.com/'
async def hello(url,semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.read()


async def run():
    semaphore = asyncio.Semaphore(500) # 限制并发量为500
    to_get = [hello(url.format(),semaphore) for _ in range(1000)] #总共1000任务
    await asyncio.wait(to_get)


if __name__ == '__main__':
#    now=lambda :time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()

(译)aiohttp 官方文档

iohttp分为服务器端和客户端,本文只介绍客户端。

由于上下文的缘故,请求代码必须在一个异步的函数中进行:

async def fn():

pass

1. aiohttp安装

pip install aiohttp

1.1. 基本请求用法

async with aiohttp.request('GET','https://github.com') as r:
        await r.text()

其中r.text(), 可以在括号中指定解码方式,编码方式,例如

await resp.text(encoding='windows-1251')

或者也可以选择不编码,适合读取图像等,是无法编码的

await resp.read()
#使用示例, 进行一次请求
 
import aiohttp, asyncio
 
async def main():#aiohttp必须放在异步函数中使用
    async with aiohttp.request('GET', 'https://api.github.com/events') as resp:
        json = await resp.json()
        print(json)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
 
------------------------------------------------------------------------------
#使用示例,进行多次请求
 
import aiohttp, asyncio
 
async def main():#aiohttp必须放在异步函数中使用
    tasks = []
    [tasks.append(fetch('https://api.github.com/events?a={}'.format(i))) for i in range(10)]#十次请求
    await asyncio.wait(tasks)
 
async def fetch(url):
    async with aiohttp.request('GET', url) as resp:
        json = await resp.json()
        print(json)    
 
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
------------------------------------------------------------------------------
#使用示例,进行多次请求,并限制同时请求的数量
 
import aiohttp, asyncio
 
async def main(pool):#aiohttp必须放在异步函数中使用
    tasks = []
    sem = asyncio.Semaphore(pool)#限制同时请求的数量
    [tasks.append(control_sem(sem, 'https://api.github.com/events?a={}'.format(i))) for i in range(10)]#十次请求
    await asyncio.wait(tasks)
 
async def control_sem(sem, url):#限制信号量
    async with sem:
        await fetch(url)
 
async def fetch(url):
    async with aiohttp.request('GET', url) as resp:
        json = await resp.json()
        print(json)    
 
loop = asyncio.get_event_loop()
loop.run_until_complete(main(pool=2))

上面的示例中可以正确的使用协程进行请求,但是由于aiohttp自身的原因会报 Unclosed client session 的警告。官方不推荐使用aiohttp.request的方式请求,可以将 aiohttp.request 换成 aiohttp.ClientSession(**kw).request的方式即可。 

具体请看2.发起一个session请求

2.发起一个session请求

首先是导入aiohttp模块:

import aiohttp

然后我们试着获取一个web源码,这里以GitHub的public Time-line页面为例:

async with aiohttp.ClientSession() as session:
    async with session.get('https://api.github.com/events') as resp:
        print(resp.status)
        print(await resp.text())

上面的代码中,我们创建了一个 ClientSession 对象命名为session,然后通过session的get方法得到一个 ClientResponse 对象,命名为resp,get方法中传入了一个必须的参数url,就是要获得源码的http url。至此便通过协程完成了一个异步IO的get请求。

有get请求当然有post请求,并且post请求也是一个协程:

session.post('http://httpbin.org/post', data=b'data')

用法和get是一样的,区别是post需要一个额外的参数data,即是需要post的数据。

除了get和post请求外,其他http的操作方法也是一样的:

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

小记:

不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。

每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。

#使用示例
 
import aiohttp, asyncio
 
async def main(pool):#启动
    sem = asyncio.Semaphore(pool)
    async with aiohttp.ClientSession() as session:#给所有的请求,创建同一个session
        tasks = []
        [tasks.append(control_sem(sem, 'https://api.github.com/events?a={}'.format(i), session)) for i in range(10)]#十次请求
        await asyncio.wait(tasks)
 
async def control_sem(sem, url, session):#限制信号量
    async with sem:
        await fetch(url, session)
 
async def fetch(url, session):#开启异步请求
    async with session.get(url) as resp:
        json = await resp.json()
        print(json)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(main(pool=2))

速度快的不要不要的

3.在URL中传递参数

我们经常需要通过 get 在url中传递一些参数,参数将会作为url问号后面的一部分发给服务器。在aiohttp的请求中,允许以dict的形式来表示问号后的参数。举个例子,如果你想传递 key1=value1   key2=value2 到 httpbin.org/get 你可以使用下面的代码:

params = {'key1': 'value1', 'key2': 'value2'}async with session.get('http://httpbin.org/get',
                       params=params) as resp:
                       assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

可以看到,代码正确的执行了,说明参数被正确的传递了进去。不管是一个参数两个参数,还是更多的参数,都可以通过这种方式来传递。除了这种方式之外,还有另外一个,使用一个 list 来传递(这种方式可以传递一些特殊的参数,例如下面两个key是相等的也可以正确传递):

params = [('key', 'value1'), ('key', 'value2')]
async with session.get('http://httpbin.org/get',
                  params=params) as r:
    assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面两种,我们也可以直接通过传递字符串作为参数来传递,但是需要注意,通过字符串传递的特殊字符不会被编码:

async with session.get('http://httpbin.org/get',                       params='key=value+1') as r: 
       assert r.url == 'http://httpbin.org/get?key=value+1'

4.响应的内容

还是以GitHub的公共Time-line页面为例,我们可以获得页面响应的内容:

async with session.get('https://api.github.com/events') as resp:    print(await resp.text())

运行之后,会打印出类似于如下的内容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,会自动将服务器端返回的内容进行解码–decode,当然我们也可以自定义编码方式:

await resp.text(encoding='gb2312')

除了text方法可以返回解码后的内容外,我们也可以得到类型是字节的内容:

print(await resp.read())

运行的结果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzip和deflate转换编码已经为你自动解码。

小记:

text(),read()方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用”字节流“(streaming response)

5.特殊响应内容:json

如果我们获取的页面的响应内容是json,aiohttp内置了更好的方法来处理json:

async with session.get('https://api.github.com/events') as resp:    print(await resp.json())

如果因为某种原因而导致resp.json()解析json失败,例如返回不是json字符串等等,那么resp.json()将抛出一个错误,也可以给json()方法指定一个解码方式:

print(await resp.json(encoding='gb2312')) 

或者传递一个函数进去:

print(await resp.json( lambda(x:x.replace('a','b')) ))

6.以字节流的方式读取响应内容

虽然json(),text(),read()很方便的能把响应的数据读入到内存,但是我们仍然应该谨慎的使用它们,因为它们是把整个的响应体全部读入了内存。即使你只是想下载几个字节大小的文件,但这些方法却将在内存中加载所有的数据。所以我们可以通过控制字节数来控制读入内存的响应内容:

async with session.get('https://api.github.com/events') as resp: 
   await resp.content.read(10) #读取前10个字节

一般地,我们应该使用以下的模式来把读取的字节流保存到文件中:

with open(filename, 'wb') as fd:
    while True:
        chunk = await resp.content.read(chunk_size)
        if not chunk:
            break
        fd.write(chunk)

7.自定义请求头

如果你想添加请求头,可以像get添加参数那样以dict的形式,作为get或者post的参数进行请求:

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}
 
await session.post(url,
                   data=json.dumps(payload),
                   headers=headers)

8.自定义Cookie

给服务器发送cookie,可以通过给 ClientSession 传递一个cookie参数:

url = 'http://httpbin.org/cookies'
cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:
    async with session.get(url) as resp:
        assert await resp.json() == {
           "cookies": {"cookies_are": "working"}}

可直接访问链接 “httpbin.org/cookies”查看当前cookie,访问session中的cookie请见第10节。

9.post数据的几种方式

(1)模拟表单post数据

payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post',
                        data=payload) as resp:
    print(await resp.text())

注意:data=dict的方式post的数据将被转码,和form提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
 async with session.post(url, data=json.dumps(payload)) as resp:
    ...

其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串可以被识别为json格式

(3)post 小文件

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}
 await session.post(url, data=files)

可以设置好文件名和content-type:

url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file',
               open('report.xls', 'rb'),
               filename='report.xls',
               content_type='application/vnd.ms-excel')
 await session.post(url, data=data)

如果将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。

@aiohttp.streamer
def file_sender(writer, file_name=None):
    with open(file_name, 'rb') as f:
        chunk = f.read(2**16)
        while chunk:
            yield from writer.write(chunk)
            chunk = f.read(2**16)
 # Then you can use `file_sender` as a data provider:
 async with session.post('http://httpbin.org/post',
                        data=file_sender(file_name='huge_file')) as resp:
    print(await resp.text())

同时我们可以从一个url获取文件后,直接post给另一个url,并计算hash值:

async def feed_stream(resp, stream):
    h = hashlib.sha256()
     while True:
        chunk = await resp.content.readany()
        if not chunk:
            break
       h.update(chunk)
        stream.feed_data(chunk)
     return h.hexdigest()
 resp = session.get('http://httpbin.org/post')

stream=StreamReader()
loop.create_task(session.post('http://httpbin.org/post', data=stream))
file_hash = await feed_stream(resp, stream)

因为响应内容类型是StreamReader,所以可以把get和post连接起来,同时进行post和get:

r = await session.get('http://python.org')

await session.post('http://httpbin.org/post',
                   data=r.content)

(5)post预压缩数据

在通过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(通常是deflate 或 zlib)作为content-encoding的值:

async def my_coroutine(session, headers, my_data):
    data = zlib.compress(my_data)
    headers = {'Content-Encoding': 'deflate'}
    async with session.post('http://httpbin.org/post',
                            data=data,
                            headers=headers)
        pass

10.keep-alive, 连接池,共享cookie

ClientSession 用于在多个连接之间共享cookie:

async with aiohttp.ClientSession() as session:
    await session.get(
        'http://httpbin.org/cookies/set?my_cookie=my_value')
    filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
    assert filtered['my_cookie'].value == 'my_value'
    async with session.get('http://httpbin.org/cookies') as r:
        json_body = await r.json()
        assert json_body['cookies']['my_cookie'] == 'my_value'

也可以为所有的连接设置共同的请求头:

async with aiohttp.ClientSession(
    headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session:
    async with session.get("http://httpbin.org/headers") as r:
        json_body = await r.json()
        assert json_body['headers']['Authorization'] == \
            'Basic bG9naW46cGFzcw=='

ClientSession 还支持 keep-alive连接和连接池(connection pooling)

11.cookie安全性

默认ClientSession使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的cookie,只能接受 DNS 解析IP产生的cookie。可以通过设置aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)

12.控制同时连接的数量(连接池)

也可以理解为同时请求的数量,为了限制同时打开的连接数量,我们可以将限制参数传递给连接器:

conn = aiohttp.TCPConnector(limit=30)#同时最大进行连接的连接数为30,默认是100,limit=0的时候是无限制

限制同时打开限制同时打开连接到同一端点的数量((host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:

conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0

13.自定义域名解析

我们可以指定域名服务器的 IP 对我们提供的get或post的url进行解析:

from aiohttp.resolver import AsyncResolver
 resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
 conn = aiohttp.TCPConnector(resolver=resolver)

14.设置代理

aiohttp支持使用代理来访问网页:

async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com") as resp:
        print(resp.status)

当然也支持需要授权的页面:

async with aiohttp.ClientSession() as session:
    proxy_auth = aiohttp.BasicAuth('user', 'pass')
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com",
                           proxy_auth=proxy_auth) as resp:
        print(resp.status)

或者通过这种方式来验证授权:

session.get("http://python.org",
            proxy="http://user:pass@some.proxy.com")

15.响应状态码 response status code

可以通过 resp.status来检查状态码是不是200:

async with session.get('http://httpbin.org/get') as resp:
    assert resp.status == 200

16.响应头

我们可以直接使用 resp.headers 来查看响应头,得到的值类型是一个dict:

>>> resp.headers
{'ACCESS-CONTROL-ALLOW-ORIGIN': '*',
 'CONTENT-TYPE': 'application/json',
 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT',
 'SERVER': 'gunicorn/18.0',
 'CONTENT-LENGTH': '331',
 'CONNECTION': 'keep-alive'}

或者我们可以查看原生的响应头:

>>> resp.raw_headers
((b'SERVER', b'nginx'),
 (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'),
 (b'CONTENT-TYPE', b'text/html; charset=utf-8'),
 (b'CONTENT-LENGTH', b'12150'),
 (b'CONNECTION', b'keep-alive'))

17.查看cookie

url = 'http://example.com/some/cookie/setting/url'
async with session.get(url) as resp:
    print(resp.cookies)

18.重定向的响应头

如果一个请求被重定向了,我们依然可以查看被重定向之前的响应头信息:

>>> resp = await session.get('http://example.com/some/redirect/')
>>> resp
<ClientResponse(http://example.com/some/other/url/) [200]>
>>> resp.history
(<ClientResponse(http://example.com/some/redirect/) [301]>,)

19.超时处理

默认的IO操作都有5分钟的响应时间 我们可以通过 timeout 进行重写:

async with session.get('https://github.com', timeout=60) as r:
    ...

如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。

作者:

喜欢围棋和编程。

 
发布于 分类 编程标签

发表评论

电子邮件地址不会被公开。