Django&Celery是怎么玩的?

celery是python的一套异步任务处理框架,可以让我们聚焦在编写业务逻辑,而不是底层异步实现。

celery为django框架提供了专用的集成方法,实施过程只需要根据官方手册即可搞定。

下面,我将总结我实施Django+Celery过程中的经验和思路,帮助大家理清脉络,快速上手。

安装Celery

依赖的版本是:celery==4.3.0。

后续所有配置流程,均基于官方手册的指导:https://docs.celeryproject.org/en/latest/django/

创建celery.py

在settings.py同级目录下创建celery.py,它是celery worker(异步处理进程)的入口:

后续我们会用celery worker程序拉起这个入口文件,该文件初始化好一个叫做app的celery对象即可。

celery worker是celery提供的一个python程序,它的作用是消费rabbitmq中的任务,并执行这些任务对应的函数。

创建tasks.py

我们可以在项目下的任意app内,创建一个tasks.py文件,上述的autodiscover_tasks()就是遍历django中注册的所有app,并在app目录下找到tasks.py模块,然后遍历模块中的所有异步任务函数。

我们的异步任务逻辑就写在tasks.py中,例如我在demo应用中创建的tasks.py实现了一个异步任务add:

该任务接受2个参数,模拟执行了60秒,最后返回一个结果,当然这个结果通常来说是没用的,因为这是一个异步的任务(Celery提供了result扩展,可以把结果放在redis之类的存储里,前台可以来获取,这个不常用,大家自己学习)。

异步任务需要用shared_task装饰器包装,这样celery在扫描函数的时候才知道这是一个异步任务,而不是一个普通函数。

生成异步任务

当我们在tasks.py中实现了异步任务的处理逻辑后,我们就需要想一下该如何投递一个任务呢?

因为我们的add方法经过了shared_task包装,因此呢它不再是一个普通函数,它还提供了很多额外的属性。

这是一个views.py中的前台接口,它调用了tasks.py中的add方法的delay方法。

我们知道python中一切皆是对象,所以add function也可以拥有delay属性,通过调用delay属性就可以投递一个异步任务到rabbitmq中。

注意,这里的5,6是add任务的传参,它会一起被打包到消息中,一起放入rabbitmq中。

配置celery

OK,我们会投递任务,也写好了任务逻辑,同时worker的入口程序也写好了。

接下来的问题是:任务序列化后该投递到哪里。

这就需要配置一下celery了,我们在celery worker的入口文件中,令celery app对象加载了django的settings.py中CELERY开头的配置项,因此我们就去编辑django的settings.py:

我用到了上述的配置项,通常我们也不需要更复杂的配置了。

关键在于:

  • rabbitmq地址是什么?
  • worker要启动多少个子进程并发处理任务,等等…
  • 要求worker必须在任务成功后才向rabbitmq做ack
  • 生产者投递的任务要进入哪个rabbitmq队列(上述例子是所有类型的任务都进入my-common-queue队列,具体配置方法自行学习)

启动worker

任务投递到rabbitmq后,我们就差启动worker了。

我们需要进入到django的项目根目录(包含manage.py的目录),然后启动celery worker:

这里apps是包含celery.py以及settings.py的包名,这样celery会在apps包下加载celery.py文件作为入口。

-Q指定worker消费rabbitmq的哪个队列来获取任务,可以传多个。

我们可以想象celery的实现原理:生产者在调用add.delay的时候,会将异步函数所在的模块名、函数名、传参序列化下来,投递到rabbitmq;worker将项目根目录加入到sys.path中,在worker端取出消息后,可以再次import引入异步函数的模块,找到函数,然后传参执行该函数。(尝试在程序中打印一下全局变量__package__,以及打印任意函数的__name__属性)

celery worker的架构默认是父子进程模式,父进程与rabbitmq通讯以及保持心跳,子进程执行单个任务,同时可以有很多子进程并发执行任务,任务可以执行很长的时间,只要不超过我们在settings.py中定义的超时时间就不会被杀死。

其他

遇到的另外一个问题,就是celery worker命令放到systemd中的时候,发现无法取到任务,很是奇怪。

官方给的解决方法是利用celery multi命令来拉起celery worker,直接按照官方的方法去做就好了,我就不贴示例了:http://docs.celeryproject.org/en/latest/userguide/daemonizing.html

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~