Celery学习指南 1. celery简介 Celery由Python编写的用来处理大量信息的分布式系统,通常使用它来实现异步任务(async task)和定时任务(crontab)。可以用Celery提供的接口快速实现并管理一个分布式任务队列。
Celery主要有四个模块
1. 任务模块(Task)
包含异步任务与定时任务,异步任务通常在业务逻辑中被触发又给消息中间件处理,定时任务由Celery beat 进程周期性的将任务定时传递给消息中间件
2. 消息中间件(Broker)
任务调度队列,接收生产者发来的Task任务,并将其存入任务队列,Celery并不提供队列服务通常使用Redis和RabbitMQ。
大量异步任务场景下,RabbitMQ执行效率和性能上比redis高。
少量异步任务场景下,redis的执行效率和性能上比RabbitMQ高。
3. 任务执行单元(Worker)
Worker实时监控消息中间件Broker消息队列,获取队列中调度的任务并执行。
4. 结果存储(Backend)
存储Worker执行的结果,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,通常使用Redis和RabbitMQ或其他
2. 快速入门 1.安装celery与消息中间件这里使用redis
pip install celery[redis]
2.创建任务与celery实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import timefrom celery import Celerybroker = 'redis://127.0.0.1:6379/0' backend = 'redis://127.0.0.1:6379/0' app = Celery('my_task' , broker=broker, backend=backend) @app.task def add (x, y ): time.sleep(5 ) return x + y
3.启动Celery Worker
1 celery worker -A tasks --loglevel=info
注:参数 -A 指定了 Celery 实例的位置; 参数 –loglevel 指定日志级别,默认为 warning,也可以使用 -l info 来表示;
4.程序调用任务
1 2 3 4 5 6 7 8 9 from tasks import addr = add.delay(2 , 8 ) print(f'结果为:{r.get()} ' ) print('我先执行了' )
运行命令python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。
3. 项目中使用Celery 在项目中使用celery时,通常的做法是将一些配置写入到一个配置文件中,命名为celeryconfig.py。
通过实例理解,项目结构如下
1 2 3 4 5 6 7 celery_learning # 项目根目录 ├── celery_app # 存放 celery 相关文件 │ ├── __init__.py │ ├── celeryconfig.py # 配置文件 │ ├── task1.py # 任务文件 1 │ └── task2.py # 任务文件 2 └── client.py # 应用程序
3.1 异步任务 __init__.py
1 2 3 4 5 from celery import Celeryapp = Celery('c_name' ) app.config_from_object('celery_app.celeryconfig' )
celeryconfig.py
1 2 3 4 5 6 7 8 9 10 BROKER_URL = 'redis://127.0.0.1:6379' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' CELERY_TIMEZONE='Asia/Shanghai' CELERY_IMPORTS = ( 'celery_app.task1' , 'celery_app.task2' )
task1.py与task2.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import timefrom celery_app import app@app.task def add (x, y ): time.sleep(5 ) return x + y import timefrom celery_app import app@app.task def multiply (x, y ): time.sleep(2 ) return x * y
client.py
1 2 3 4 5 6 7 8 9 from celery_app import task1from celery_app import task2t1_result = task1.add.delay(5 , 5 ) t2_result =task2.multiply.delay(6 , 2 ) print(f't1结果为:{t1_result.get()} ' ) print(f't2结果为:{t2_result.get()} ' ) print('我先执行了' )
启动worker
celery -A celery_app worker -loglevel=info
1 2 3 4 5 celery -A 项目名 worker -loglevel =info : 前台启动命令 celery multi start w1 -A 项目名 -l info : 后台启动命令 celery multi restart w1 -A 项目名 -l info : 后台重启命令 celery multi stop w1 -A 项目名 -l info : 后台停止命令 注:后台启动celery4之后,windows平台不适用
3.2 定时任务 Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。
修改celeryconfig.py配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from datetime import timedeltafrom celery.schedules import crontabBROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' CELERY_TIMEZONE='Asia/Shanghai' CELERY_IMPORTS = ( 'celery_app.task1' , 'celery_app.task2' ) CELERYBEAT_SCHEDULE = { 'add-every-30-seconds' : { 'task' : 'celery_app.task1.add' , 'schedule' : timedelta(seconds=30 ), 'args' : (5 , 5 ) }, 'multiply-at-some-time' : { 'task' : 'celery_app.task2.multiply' , 'schedule' : crontab(hour=9 , minute=50 ), 'args' : (2 , 6 ) } }
启动Worker进程
celery -A celery_app worker -loglevel=info
启动Beat进程
celery -A celery_app beat
更复杂的定时配置
1 2 3 4 5 6 7 8 9 10 from celery.schedules import crontabapp.conf.beat_schedule = { 'add-every-monday-morning' : { 'task' : 'tasks.add' , 'schedule' : crontab(hour=7 , minute=30 , day_of_week=1 ), 'args' : (), }, }
定时配置参考
Example
Meaning
crontab()
每分钟执行一次
crontab(minute=0, hour=0)
每天午夜执行
crontab(minute=0, hour=’*/3’)
每三小时执行一次:凌晨3点、6点、9点、中午3点、下午6点、晚上9点。
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’)
同上
crontab(minute=’*/15’)
每15分钟执行一次
crontab(day_of_week=’sunday’)
每分钟执行一次,在星期天
crontab(minute=’‘,hour=’ ‘, day_of_week=’sun’)
同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’)
每十分钟执行一次,但只能在周四或周五的凌晨3-4点、下午5-6点和晚上10-11点之间执行
crontab(minute=0, hour=’/2, /3’)
每偶数小时执行一次,每小时可被三整除。这意味着:除了:凌晨1点、5点、7点、11点、下午1点、下午5点、晚上7点、晚上11点以外,每小时
crontab(minute=0, hour=’*/5’)
执行可被5整除的小时数。这意味着它是在下午3点触发的,而不是下午5点(因为下午3点等于24小时时钟值“15”,可以被5整除)。
crontab(minute=0, hour=’*/3,8-17’)
每小时执行一次,可被3整除,办公时间(上午8点到下午5点)每小时执行一次
crontab(day_of_month=’2’)
每月第二天执行
crontab(day_of_month=’2-30/3’)
每偶数天执行一次
crontab(day_of_month=’1-7,15-21’)
每月第一周和第三周执行
crontab(day_of_month=’11’,month_of_year=’5’)
每年5月11日执行
crontab(month_of_year=’*/3’)
每季度第一个月执行
4. Django项目中使用Celery 可以直接使用Celery,同时python还提供了django-celery模块
本实例使用django-celery
pip install django-celery
pip install redis==2.10.6
1 2 3 4 5 6 7 8 9 10 11 django_celery # 项目根目录 ├── celery_app # 异步应用 │ ├── __init__.py │ ├── tasks.py # 任务文件 ├── django_celery │ ├── __init__.py │ ├── celery.py # celery文件 │ ├── settings.py # django配置文件 │ ├── urls.py │ └── wsgi.py └── manage.py # 应用程序
4.1 在应用中新建tasks任务
tasks.py
1 2 3 4 5 6 7 8 9 from __future__ import absolute_import, unicode_literalsimport timefrom celery import shared_task@shared_task def add (x, y ): time.sleep(5 ) return x + y
4.2 在settings.py 的同级目录下添加一个 celery.py文件
celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celeryfrom django.conf import settingsos.environ.setdefault("DJANGO_SETTINGS_MODULE" , "django_celery.settings" ) app = Celery('django_celery' ) app.config_from_object('django.conf:settings' ) app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)
__init__.py
1 2 3 from __future__ import absolute_importfrom .celery import app as celery_app
4.3 在settings配置文件中注册应用,并加入celery配置
settings.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 INSTALLED_APPS = [ 'djcelery' , 'celery_app' , ] import djcelerydjcelery.setup_loader() CELERY_TIMEZONE = 'Asia/Shanghai' BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' CELERY_IMPORTS = ('celery_app.tasks' ,) CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
4.4 启动worker
1 2 celery -A django_celery worker -l info celery multi stop w1 -A first_django_app -l info
4.5 测试
python manage.py shell
1 2 3 4 5 6 7 >>> from celery_app.tasks import add >>> add.delay(5 ,5 ) <AsyncResult: 331 a736b-2569 -456 b-beb9-1 f87be17d32b> >>> from celery.result import AsyncResult >>> res = AsyncResult("331 a736b-2569 -456 b-beb9-1 f87be17d32b") >>> res.result 10
查看redis客户端:
5.异步发送邮件小实例 settings里配置邮箱的一些参数
1 2 3 4 5 6 7 8 EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' EMAIL_USE_TLS = True EMAIL_USE_SSL = False EMAIL_HOST = 'smtp.163.com' EMAIL_PORT = 25 EMAIL_HOST_USER = 'xxxxx@163.com' EMAIL_HOST_PASSWORD = 'HDFUIANJKFDFI' EMAIL_TO_USER_LIST = ['xxxxx@163.com' ]
邮件发送代码,放到 celery_app/utils.py 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from django.core.mail import send_mailfrom django.conf import settingsdef send_email (subject,message ): send_mail(subject, message, settings.EMAIL_HOST_USER, settings.EMAIL_TO_USER_LIST,) print('邮件发送成功!' )
编写异步发送邮件的 task
1 2 3 4 5 6 7 from .utils import send_email@shared_task def send_email_task (subject, message ): time.sleep(5 ) send_email(subject, message) return True
视图函数,使用异步处理
1 2 3 4 5 6 7 8 9 from django.http import HttpResponsefrom celery_app.tasks import send_email_taskdef send_email (request, *args, **kwargs ): try : res = send_email_task.delay('这是来自django-celery的测试' ,'这是一次celery异步邮件测试' ) except Exception as e: print('异常:{}' .format (str (e))) return HttpResponse('发送邮件成功,请耐心等待' )
创建路由,主路由URLConf 配置,添加celery_app应用的映射入口
1 2 3 4 5 from django.urls import path,includeurlpatterns = [ path('' , include('celery_app.urls' )) ]
celery_app下新建urls.py文件
1 2 3 4 5 from django.urls import pathfrom celery_app import viewsurlpatterns = [ path('send_email/' , views.send_email), ]
最后启动项目,输入指定url,如http://127.0.0.1:8000/send_email/
此时请求会立马返回,提示邮件发送成功,而任务会交给 celery 去执行,并最后将结果放到 redis 服务中,邮件通过异步的方式发到指定邮箱