Python异步编程的几个概念

协程函数(Coroutine Function)

在Python中,使用async def定义的函数就是协程函数,下面这段代码中,我们定义了一个名为asyncsleep的协程函数

1
2
async def asyncsleep(sec):
await asyncio.sleep(sec)

协程对象(Coroutine Object)

在Python中,当我们直接调用协程函数时,Python不会直接运行这个函数内的任何代码,而是返回一个协程对象,以下代码中的crt变量就是调用asyncsleep(1)后返回的协程对象

1
2
3
4
5
6
7
8
9
import asyncio

async def asyncsleep(sec):
await asyncio.sleep(sec)

crt = asyncsleep(1)
print(crt)

输出:<coroutine object asyncsleep at 0x7f5dcec10270>

Task对象

在这里我们可以先看一下Task对象是怎么运行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def tsk(num):
await asyncio.sleep(1)
return num

async def main():
crt = tsk(10)
print(crt)

task = asyncio.create_task(crt)
print(task)

await task
print(task)

if __name__ == "__main__":
asyncio.run(main())

输出:
<coroutine object tsk at 0x7f32908051c0>
<Task pending name='Task-2' coro=<tsk() running at /workspace/PythonProject/main.py:3>>
<Task finished name='Task-2' coro=<tsk() done, defined at /workspace/PythonProject/main.py:3> result=10>

我们可以看出,在main函数中,crt首先被包装为处于pending状态的Task对象,在经过await后,其状态变成了finished,同时返回了一个值为10的result。其实,Task对象就是被asyncio.create_task()封装、注册进事件循环中的协程对象,以便当事件循环获得线程控制权时,可以异步执行这个特殊对象

事件循环(Event Loop)

事件循环是asyncio的核心,是asyncio中的低层级API。它的主要功能是运行异步任务和回调,执行网络 IO 操作,以及运行子进程,见下面的示意图
事件循环示意图
结合之前的内容我们可以知道,对于协程对象,我们是没法直接运行它的内部功能的,必须通过await运行。比如在下面这段代码中,我们使用await直接运行了两个协程对象中的功能,最终耗时3秒。在这里,await的作用可以理解为阻塞运行了asyncsleep(1)asyncsleep(2)两个协程对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

async def asyncsleep(sec):
print(f"sleep time: {sec}")
await asyncio.sleep(sec)

async def main():
print(time.strftime("%X"))
# await运行的是协程对象
await asyncsleep(1)
await asyncsleep(2)
print(time.strftime("%X"))

if __name__ == "__main__":
asyncio.run(main())

输出:
03:15:38
sleep time: 1
sleep time: 2
03:15:41

以上的代码显然不能满足我们的要求,如果我们想让程序异步等待,就必须将协程对象封装成Task对象,将其注册进事件循环中,再通过await运行。比如在下面这段代码的main()函数中,我们使用create_task()创建了task1、task2两个Task对象,然后使用asyncio.run()运行事件循环,可以发现,程序只消耗了2秒就完成了运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time

async def asyncsleep(sec):
print(f"sleep time: {sec}")
await asyncio.sleep(sec)

async def main():
task1 = asyncio.create_task(asyncsleep(1))
task2 = asyncio.create_task(asyncsleep(2))

print("Start Tasks")
print(time.strftime("%X"))
# await运行的是Task对象
await task1
await task2
print(time.strftime("%X"))

if __name__ == "__main__":
asyncio.run(main())

输出:
Start Tasks
03:11:40
sleep time: 1
sleep time: 2
03:11:42

其实事件循环本质上是一个无限循环,我们可以使用asyncio.get_event_loop()等方法获取或创建它。它主要工作就是当我们使用await关键字处理Task对象时执行Task内部的功能,当遇到Task内部的阻塞等待时,便将Task挂起,执行其他的处于pending状态的Task,同时它会在循环中检查挂起的Task是否处于finished状态了,如果是就收集返回的result。当所有await的Task都完成后,事件循环便终止,将线程控制权交出

asyncio的应用场景

需要注意的是,使用asyncio的程序仍是单线程运行的,它只是利用了CPU在处理IO密集型任务时的等待时间,让CPU可以处理其他任务,所以当程序需要阻塞线程或处理计算密集型任务时,异步编程就没有优势了。比如下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import time

async def blockingsleep(sec):
print(f"blocking sleep {sec}")
time.sleep(sec)

async def main():
task1 = asyncio.create_task(blockingsleep(1))
task2 = asyncio.create_task(blockingsleep(2))

print(time.strftime("%X"))
# sleep会阻塞当前线程,task2必须等待task1完成后才能执行
await task1
await task2
print(time.strftime("%X"))

if __name__ == "__main__":
asyncio.run(main())

输出:
03:14:02
blocking sleep 1
blocking sleep 2
03:14:05
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def addup(num):
print(f"addup: {num}")
res = 0
for i in range(1, num):
res = res + i
print(res)

async def main():
task1 = asyncio.create_task(addup(100000001))
task2 = asyncio.create_task(addup(100001))

# 在运行task1时,CPU在处理计算密集型任务而不是处于等待状态,此时task2没法异步运行
await task1
await task2

if __name__ == "__main__":
asyncio.run(main())

所以我们可以看出asyncio的应用场景主要是IO密集型任务,如网络读取、硬盘读取、数据库读取等

asyncio并发限制

有时我们需要对协程数量进行一定限制,比如在网络请求时,应该考虑站点的并发能力来设置请求规模,否则可能会被目标站点识别为网络攻击。在asyncio中提供了信号量对象Semaphore()来限制协程数量,具体使用可参考下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def tsk(sem, x):
async with sem:
print(f"Start Coroutine: {x}")
await asyncio.sleep(1)
print("Coroutine Done")

async def main():
# 设置协程数量为10
sem = asyncio.Semaphore(10)
tasks = [tsk(sem, n) for n in range(50)]
await asyncio.gather(*tasks)

if __name__ == "__main__":
asyncio.run(main())

运行以上代码可以发现,同时运行的协程对象数量会被限制在10个,这样就可以很方便地限制并发数量。但是在使用 asyncio.Semaphore()也有一些点需要注意,具体见下面的内容

不能使用async with … as …管理信号量对象

在使用asyncio.Semaphore()时不能使用async with ... as ...上下文管理语法,比如

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def main():
async with (sem:=asyncio.Semaphore(2)) as from_aenter:
print(f'{sem}')
print(f'{from_aenter}')

if __name__ == "__main__":
asyncio.run(main())

输出:
<asyncio.locks.Semaphore object at 0x7f197fc47550 [unlocked, value:1]>
None

这是因为使用async with ... as ...语法的对象需要实现__aenter__方法,而在asyncio中信号量对象__aenter__方法返回值为None

asyncio.Semaphore()的使用位置

在下面的代码中,我们设置了同时等待的协程对象最多为10个,预计需要时间为5秒。但是运行代码可以发现,完成所有任务只消耗了1秒

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
import time

async def main():
sem = asyncio.Semaphore(10)
async with sem:
tasks = [asyncio.sleep(1) for _ in range(50)]
print(time.strftime("%X"))
await asyncio.gather(*tasks)
print(time.strftime("%X"))

if __name__ == "__main__":
asyncio.run(main())

查看文档我们可以发现和下面演示代码一样的内容。即在使用async with ...时实际是协程对象 执行acquire()向信号量对象申请使用闲置队列使计数器减值,协程对象内部功能完成后执行release()向信号量对象释放队列使计数器增值。所以,在使用Semaphore()时,应该在每一个需要执行的协程对象内部 使用,由协程对象向信号量对象主动请求闲置队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
"""1"""
sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource

which is equivalent to:

"""2"""
sem = asyncio.Semaphore(10)
# ... later
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()