Python异步编程的几个概念 协程函数(Coroutine Function) 在Python中,使用async def
定义的函数就是协程函数,下面这段代码中,我们定义了一个名为asyncsleep
的协程函数
1 2 async def asyncsleep (sec ): await asyncio.sleep(sec)
协程对象(Coroutine Object) 在Python中,当我们直接调用协程函数时,Python不会直接运行这个函数内的任何代码,而是返回一个协程对象,以下代码中的crt
变量就是调用asyncsleep(1)
后返回的协程对象
1 2 3 4 5 6 7 8 9 import asyncioasync def asyncsleep (sec ): await asyncio.sleep(sec) crt = asyncsleep(1 ) print (crt)输出:<coroutine object asyncsleep at 0x7f5dcec10270 >
Task对象 在这里我们可以先看一下Task对象是怎么运行的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioasync def tsk (num ): await asyncio.sleep(1 ) return num async def main (): crt = tsk(10 ) print (crt) task = asyncio.create_task(crt) print (task) await task print (task) if __name__ == "__main__" : asyncio.run(main()) 输出: <coroutine object tsk at 0x7f32908051c0 > <Task pending name='Task-2' coro=<tsk() running at /workspace/PythonProject/main.py:3 >> <Task finished name='Task-2' coro=<tsk() done, defined at /workspace/PythonProject/main.py:3 > result=10 >
我们可以看出,在main函数中,crt
首先被包装为处于pending
状态的Task对象,在经过await
后,其状态变成了finished
,同时返回了一个值为10的result
。其实,Task对象就是被asyncio.create_task()
封装、注册进事件循环中的协程对象,以便当事件循环获得线程控制权时,可以异步执行这个特殊对象
事件循环(Event Loop) 事件循环是asyncio的核心,是asyncio中的低层级API。它的主要功能是运行异步任务和回调,执行网络 IO 操作,以及运行子进程,见下面的示意图 结合之前的内容我们可以知道,对于协程对象,我们是没法直接运行它的内部功能的,必须通过await
运行。比如在下面这段代码中,我们使用await
直接运行了两个协程对象中的功能,最终耗时3秒。在这里,await
的作用可以理解为阻塞运行了asyncsleep(1)
和asyncsleep(2)
两个协程对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport timeasync def asyncsleep (sec ): print (f"sleep time: {sec} " ) await asyncio.sleep(sec) async def main (): print (time.strftime("%X" )) await asyncsleep(1 ) await asyncsleep(2 ) print (time.strftime("%X" )) if __name__ == "__main__" : asyncio.run(main()) 输出: 03:15 :38 sleep time: 1 sleep time: 2 03:15 :41
以上的代码显然不能满足我们的要求,如果我们想让程序异步等待,就必须将协程对象封装成Task对象,将其注册进事件循环中,再通过await
运行。比如在下面这段代码的main()函数中,我们使用create_task()
创建了task1、task2两个Task对象,然后使用asyncio.run()
运行事件循环,可以发现,程序只消耗了2秒就完成了运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import asyncioimport timeasync def asyncsleep (sec ): print (f"sleep time: {sec} " ) await asyncio.sleep(sec) async def main (): task1 = asyncio.create_task(asyncsleep(1 )) task2 = asyncio.create_task(asyncsleep(2 )) print ("Start Tasks" ) print (time.strftime("%X" )) await task1 await task2 print (time.strftime("%X" )) if __name__ == "__main__" : asyncio.run(main()) 输出: Start Tasks 03:11 :40 sleep time: 1 sleep time: 2 03:11 :42
其实事件循环本质上是一个无限循环,我们可以使用asyncio.get_event_loop()
等方法获取或创建它。它主要工作就是当我们使用await
关键字处理Task对象时执行Task内部的功能,当遇到Task内部的阻塞等待时,便将Task挂起,执行其他的处于pending
状态的Task,同时它会在循环中检查挂起的Task是否处于finished
状态了,如果是就收集返回的result
。当所有await的Task都完成后,事件循环便终止,将线程控制权交出
asyncio的应用场景 需要注意的是,使用asyncio的程序仍是单线程运行的,它只是利用了CPU在处理IO密集型任务时的等待时间,让CPU可以处理其他任务,所以当程序需要阻塞线程或处理计算密集型任务时,异步编程就没有优势了。比如下面的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioimport timeasync def blockingsleep (sec ): print (f"blocking sleep {sec} " ) time.sleep(sec) async def main (): task1 = asyncio.create_task(blockingsleep(1 )) task2 = asyncio.create_task(blockingsleep(2 )) print (time.strftime("%X" )) await task1 await task2 print (time.strftime("%X" )) if __name__ == "__main__" : asyncio.run(main()) 输出: 03:14 :02 blocking sleep 1 blocking sleep 2 03:14 :05
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def addup (num ): print (f"addup: {num} " ) res = 0 for i in range (1 , num): res = res + i print (res) async def main (): task1 = asyncio.create_task(addup(100000001 )) task2 = asyncio.create_task(addup(100001 )) await task1 await task2 if __name__ == "__main__" : asyncio.run(main())
所以我们可以看出asyncio的应用场景主要是IO密集型任务,如网络读取、硬盘读取、数据库读取等
asyncio并发限制 有时我们需要对协程数量进行一定限制,比如在网络请求时,应该考虑站点的并发能力来设置请求规模,否则可能会被目标站点识别为网络攻击。在asyncio中提供了信号量对象Semaphore()
来限制协程数量,具体使用可参考下面的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def tsk (sem, x ): async with sem: print (f"Start Coroutine: {x} " ) await asyncio.sleep(1 ) print ("Coroutine Done" ) async def main (): sem = asyncio.Semaphore(10 ) tasks = [tsk(sem, n) for n in range (50 )] await asyncio.gather(*tasks) if __name__ == "__main__" : asyncio.run(main())
运行以上代码可以发现,同时运行的协程对象数量会被限制在10个,这样就可以很方便地限制并发数量。但是在使用 asyncio.Semaphore()
也有一些点需要注意,具体见下面的内容
不能使用async with … as …管理信号量对象 在使用asyncio.Semaphore()
时不能使用async with ... as ...
上下文管理语法,比如
1 2 3 4 5 6 7 8 9 10 11 12 13 import asyncioasync def main (): async with (sem:=asyncio.Semaphore(2 )) as from_aenter: print (f'{sem} ' ) print (f'{from_aenter} ' ) if __name__ == "__main__" : asyncio.run(main()) 输出: <asyncio.locks.Semaphore object at 0x7f197fc47550 [unlocked, value:1 ]> None
这是因为使用async with ... as ...
语法的对象需要实现__aenter__
方法,而在asyncio中信号量对象 的__aenter__
方法返回值为None
asyncio.Semaphore()
的使用位置在下面的代码中,我们设置了同时等待的协程对象最多为10个,预计需要时间为5秒。但是运行代码可以发现,完成所有任务只消耗了1秒
1 2 3 4 5 6 7 8 9 10 11 12 13 import asyncioimport timeasync def main (): sem = asyncio.Semaphore(10 ) async with sem: tasks = [asyncio.sleep(1 ) for _ in range (50 )] print (time.strftime("%X" )) await asyncio.gather(*tasks) print (time.strftime("%X" )) if __name__ == "__main__" : asyncio.run(main())
查看文档 我们可以发现和下面演示代码一样的内容。即在使用async with ...
时实际是协程对象 执行acquire()
向信号量对象申请使用闲置队列使计数器减值,协程对象内部功能完成后执行release()
向信号量对象释放队列使计数器增值。所以,在使用Semaphore()
时,应该在每一个需要执行的协程对象内部 使用,由协程对象向信号量对象主动请求闲置队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 """1""" sem = asyncio.Semaphore(10 ) async with sem: which is equivalent to: """2""" sem = asyncio.Semaphore(10 ) await sem.acquire()try : finally : sem.release()