一次性学会python asyncio

最早python发布asyncio的时候,我就尝试去了解过它的原理与使用。

今天,python asyncio的生态已经趋于成熟,无论是你想编写TCP/UDP服务端/客户端,还是使用异步mysql、Kafka、redis等,均已经有了开箱即用的标准库以及开源库,并且asyncio已经成了这些类库所依托的基础平台设施。

asyncio采用async/await的协程编程模型,在nodejs/c#等语言中都是采用了这种语法,我们一旦能够理解python asyncio,也就理解了这一类协程编程模型。

asyncio技术体系

asyncio底层依托于event loop,常见就是linux平台的epoll机制。

协程只是对event loop使用的一种抽象与编程简化。

asyncio在代码实现上是自底而上分层的:

  • coroutine协程,tasks任务(asyncio最核心的理解部分)
  • streams网络流(TCP、UDP)
  • synchronzation同步操作(锁、条件变量)
  • subprocess子进程(协程中唤起并等待子进程执行一段代码)
  • queue队列(协程间数据传递)

这篇博客只教给大家第1点,也就是把协程吃透,其他的内容都是小儿科的应用问题而已。

协程入门

假设有一个需求:每秒打印一次hello world,我们可以这样做:

这没什么问题,但是如果我想与此同时实现每隔2秒打印一次goodbye world呢?可见这种阻塞模型是没法实现并发任务调度的。

协程!

我们使用asyncio底层的eventloop来实现异步sleep,这样sleep不会阻塞线程,而是当要睡眠之前把线程执行权交还给eventloop:

在函数前加上async关键字,那么它就不是函数了,而是用来创建协程对象的方法。

co = print_hello()只是返回一个协程对象,并不会执行函数里的代码,什么时候会执行呢?把这个协程对象交给asyncio,让它去调度即可,run的内部会循环执行eventloop的调度循环。

在这个协程中,我们会创建一个sleep协程对象,然后再await等待它返回,因此await后面必须跟着另外一个协程,这样可以把当前协程的执行权交给子协程并等待其返回值。

当然,asyncio.sleep并不会阻塞线程,它已经与eventloop联动,作为一个事件循环的定时器存在,所以是纯异步的sleep。

协程并发

回到需求本身,我们要实现2个并发的协程:

  • 每隔1秒打印hello world
  • 每隔2秒打印goodbye world

这很简单:

利用async定义2个协程,然后分别创建协程对象,一起交给eventloop调度执行,调度会持续进行直到2个协程都return返回为止(当然,我们2个协程都是死循环,不会退出)。

gather只是把2个协程合并为逻辑上的1个协程,因为run_util_complete只能传1个协程参数进去,其效果就是等待2个协程都结束才算结束。

asyncio实现了2个协程的并发执行:

协程高级篇

理解上述内容,并不能让你随心所欲的实现代码逻辑,我来举一个例子。

我们有一个新需求:实现一个爬虫翻页下载器,它要通过不断翻页来完成对https://yuerblog.cc/1、https://yuerblog.cc/2、https://yuerblog.cc/3等等这些URL的抓取,希望越高效越好。

最简单的就是顺序抓取,不需要并发,代码这么写:

调度协程while死循环,创建download协程并且await等待它执行。

因为调度协程每次都要等待当前download协程完成,所以翻页过程从逻辑上是串行阻塞的,并不能对多个URL实现并发的下载。

怎么办呢?这就是asyncio协程的最重要的技术细节,那就是Task。

利用Task分离协程

上述代码的问题是schedule协程await了download协程,但我们并不希望schedule等待download,如何实现呢?

只需要把coroutine对象包装为Task,那么coroutine就会直接注册到eventloop中被asyncio接管,在下次事件循环时这个coroutine就会被独立调度执行,因此也就和schedule协程分离了。

代码略作调整,create_task包装了coroutine,返回一个task对象,我们暂时不需要使用task对象,create_task已经完成了coroutine注册到eventloop的工作,它会被后续调度执行的,与schedule协程没有任何关系。

但是运行这份代码,你不会看见任何输出,因为这个代码是有问题的。

问题原因是,schedule协程的死循环中是完全的CPU运算,没有任何调用点会主动让出线程执行权,因此线程已经被死循环占死,download协程没有机会得到调度。

为了让schedule协程能够有契机让出线程,我们在while循环里调用一次asyncio.sleep(0),这样eventloop才有机会暂停该协程,调度其他协程:

现在可以看到大量的日志刷屏,瞬间几万个下载协程就已经运行完成了:

跟踪Task返回值

因为我们没有await下载协程,所以并不能知道它的执行结果。

Task已经考虑到了这点,我们注册一个回调函数即可:

download协程下载完成后会return返回页面内容。

task注册了on_finish回调,asyncio会传入该结束的task到on_finish,我们可以调用task的result/exception获取协程的返回结果或者异常对象。

如果我们想向on_finish回调传入一些上下文数据,需要自己利用闭包的方式带入,利用python的funtiontools可以轻松做到:

利用partial方法闭包了一个url参数,因此可以在on_finish回调中得到url信息。

asyncio.ensure_future和asyncio.create_task用法和效果一样,在3.7版本之后官方手册建议大家只使用task即可。

关于asyncio协程开发的关键内涵就这么多,相信大家已经可以驾驭asyncio完成各种复杂应用了。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~