Celery是基于python开发的分布式任务队列,对于一些耗时的任务,我们可以把它交给celery异步处理,同时celery还可以做定时任务。
Celery在执行任务时,需要一个中间人(broker)来发送和接收任务消息。充当这个中间人角色的是 RabbitMQ 或者 Redis,本文将使用Redis作为broker。关于Redis的介绍和使用,可以参考下上一篇文章 django博客开发:redis的安装以及用django-redis作缓存
安装
本文使用的是celery 4.1.0,当前最新版本
pip install -U Celery
因为要用redis做中间人,所以可以用下面这种来安装celery和一些支持redis的组件
pip install "celery[redis]"
在django中使用celery
我们的django项目结构通常是这样的
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
在proj/proj/ 下创建celery.py文件,用来定义一个celery实例。
proj/proj/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# 这里定义了namespace='CELERY',在settings里所有跟celery有关的设置都有加上'CELERY_'前缀
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
# 自动搜索所有注册的app下的tasks.py,所有celery处理的任务,都写在app下的tasks.py里
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
在proj/proj/__init__.py里引入该实例
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
接下来需要在proj/proj/settings.py里进行设置
# celery settings
# 使用redis作为中间人
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 任务的结果存储在redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
关于RESULT_BACKEND,还可以使用django的ORM或者Cache来存储任务结果,需要用到一个扩展app, django-celery-results,使用也很简单
1. 安装 pip install django-celery-results
2. 加入到INSTALLED_APPS中
settings.py
INSTALLED_APPS = (
...,
'django_celery_results',
)
3. 数据库迁移 python manage.py migrate django_celery_results
4. 在settings.py里设置CELERY_RESULT_BACKEND
任务结果存储到数据库的话:
CELERY_RESULT_BACKEND = 'django-db'
存储到缓存中:
CELERY_RESULT_BACKEND = 'django-cache'
处理异步任务
Celery的功能直接就是异步处理,比如在以下场景:我做评论邮件通知功能时,按照逻辑,回复一条评论,需要发送邮件通知,而发邮件算是一个比较耗时的任务,每次提交评论后都会卡1~2秒,非常影响用户体验。
假设原来的代码是这样的:
blog/views.py
import time
# 发表评论的视图函数
def submit_comment(request):
...
...
# 发送邮件
send_mail(email)
return JsonResponse({'msg': '评论出错!'})
# 发送邮件的函数
def send_mail(email):
...
...
print('sending email...')
time.sleep(2) # 耗时两秒
print('finished')
return True
把send_mail这任务交给celery来处理, 在blog/下创建tasks.py
blog/tasks.py
from proj import celery_app # 这是我们之前定义的celery实例
import time
# 原来的发送邮件函数写在这里,并用task装饰器
@celery_app.task
def send_mail(*args, **kwargs):
...
...
print('sending email...')
time.sleep(2) # 耗时两秒
print('finished')
return True
blog/views.py
from .tasks import send_mail
# 发表评论的视图函数
def submit_comment(request):
...
...
# 发送邮件
send_mail.delay(email)
return JsonResponse({'msg': '评论出错!'})
注意:这里send_mail的参数必须是可json序列化的对象,不然会报错
运行任务
任务已经写好了,还需要启动celery才能运行任务
启动celery也很简单,在项目根目录下(跟我们运行 python manage.py runserver 一样),运行如下命令
celery -A proj worker -l info
注意:这里的proj就是我们之前定义celery实例时,你可以自己更改,比如我定义的是 app = Celery('django_blog')
正常运行后,终端里会显示如下:
运行celery之后,再打开一个终端,启动服务器
python manage.py runserver
尝试发送评论,不会再有之前的阻塞现象,终端的log如下:
每当celery执行一个任务,都会产生一个key,这个key对应的就是任务的结果,它存储在result_backend里,我们使用的是redis,可以去redis里查看一下
打开redis交互
/usr/local/bin/redis-cli
以上就是celery执行异步任务的介绍,下一篇文章将介绍celery定时炸弹任务。
注意:每当添加一个任务时,都需要重新启动celery,否则celery无法发现新添加的任务。
supervisor启动celery
在生产环境下,我们肯定不希望像上面介绍的那样手动开启celery,这里可以使用进程监控神器 supervisor,关于supervisor的使用,在上一篇文章 django博客开发:redis的安装以及用django-redis作缓存 中有介绍
在/etc/supervisor/conf.d 下创建 celery.conf,作为监控celery的配置文件
vim /etc/supervisor/conf.d/celery.conf
/etc/supervisor/conf.d/celery.conf
[program:celery]
; 这里是项目的路径
directory = /home/aaron/Desktop/sites/aaron-zhao.com/django_blog
; 启动celery的命令
command = /home/aaron/Desktop/sites/aaron-zhao.com/env/bin/celery -A django_blog worker -l info --logfile log/celery.log
; 进程数,默认是1
numprocs = 1
; 自动启动
autostart = true
; 自动重启
autorestart = true
NOTICE:这里启动celery的命令 /home/aaron/Desktop/sites/aaron-zhao.com/env/bin/ 是celery所在的路径,因为celery是安装在虚拟环境中的 env/ 就是虚拟环境。 --logfile log/celery.log 是配置celery的日志,日志的路径自行更改。