Celery学习指南

Celery学习指南

1. celery简介

Celery由Python编写的用来处理大量信息的分布式系统,通常使用它来实现异步任务(async task)和定时任务(crontab)。可以用Celery提供的接口快速实现并管理一个分布式任务队列。


Celery_framework

Celery主要有四个模块

1. 任务模块(Task)

包含异步任务与定时任务,异步任务通常在业务逻辑中被触发又给消息中间件处理,定时任务由Celery beat 进程周期性的将任务定时传递给消息中间件

2. 消息中间件(Broker)

任务调度队列,接收生产者发来的Task任务,并将其存入任务队列,Celery并不提供队列服务通常使用Redis和RabbitMQ。

  • 大量异步任务场景下,RabbitMQ执行效率和性能上比redis高。
  • 少量异步任务场景下,redis的执行效率和性能上比RabbitMQ高。

3. 任务执行单元(Worker)

Worker实时监控消息中间件Broker消息队列,获取队列中调度的任务并执行。

4. 结果存储(Backend)

存储Worker执行的结果,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,通常使用Redis和RabbitMQ或其他

2. 快速入门

1.安装celery与消息中间件这里使用redis

pip install celery[redis]

2.创建任务与celery实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# tasks.py
import time
from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/0'
app = Celery('my_task', broker=broker, backend=backend)
# Celery 的第一个参数是当前模块的名称
# broker中间人关键字参数,指定你所使用的消息中间人的 URL
# backend存储任务结果的地方,指定存储用 redis
@app.task # 当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务
def add(x, y):
time.sleep(5) # 模拟耗时操作
return x + y

3.启动Celery Worker

1
celery worker -A tasks --loglevel=info

注:参数 -A 指定了 Celery 实例的位置;
参数 –loglevel 指定日志级别,默认为 warning,也可以使用 -l info 来表示;

4.程序调用任务

1
2
3
4
5
6
7
8
9
# client.py
# 从 tasks.py 文件中导入了 add 任务对象
from tasks import add

r = add.delay(2, 8) # 使用 delay()调度任务
print(f'结果为:{r.get()}')
print('我先执行了')

# delay() 方法将任务发送到消息中间件(Broker),Celery Worker 进程监控到该任务后,就会进行执行

运行命令python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。

3. 项目中使用Celery

在项目中使用celery时,通常的做法是将一些配置写入到一个配置文件中,命名为celeryconfig.py

通过实例理解,项目结构如下

1
2
3
4
5
6
7
celery_learning                    # 项目根目录
├── celery_app # 存放 celery 相关文件
│ ├── __init__.py
│ ├── celeryconfig.py # 配置文件
│ ├── task1.py # 任务文件 1
│ └── task2.py # 任务文件 2
└── client.py # 应用程序

3.1 异步任务

__init__.py

1
2
3
4
5
# __init__.py
from celery import Celery

app = Celery('c_name') # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块

celeryconfig.py

1
2
3
4
5
6
7
8
9
10
# celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend

CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,默认是 UTC

CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.task1',
'celery_app.task2'
)

task1.py与task2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# task1.py
import time
from celery_app import app

@app.task
def add(x, y):
time.sleep(5) # 模拟耗时操作
return x + y

# task2.py
import time
from celery_app import app

@app.task
def multiply(x, y):
time.sleep(2)
return x * y

client.py

1
2
3
4
5
6
7
8
9
# client.py
from celery_app import task1
from celery_app import task2

t1_result = task1.add.delay(5, 5) # 也可用 task1.add.apply_async(args=[5, 5])
t2_result =task2.multiply.delay(6, 2) # 也可用 task2.multiply.apply_async(args=[6, 2])
print(f't1结果为:{t1_result.get()}')
print(f't2结果为:{t2_result.get()}')
print('我先执行了')

启动worker

celery -A celery_app worker -loglevel=info

1
2
3
4
5
celery -A 项目名 worker -loglevel=info : 前台启动命令
celery multi start w1 -A 项目名 -l info : 后台启动命令
celery multi restart w1 -A 项目名 -l info : 后台重启命令
celery multi stop w1 -A 项目名 -l info : 后台停止命令
注:后台启动celery4之后,windows平台不适用

3.2 定时任务

Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。

修改celeryconfig.py配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab

# Broker and Backend
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

# Timezone
CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'

# 任务
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)

# schedules
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'celery_app.task1.add',
'schedule': timedelta(seconds=30), # 每 30 秒执行一次
'args': (5, 5) # 任务函数参数
},
'multiply-at-some-time': {
'task': 'celery_app.task2.multiply',
'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次
'args': (2, 6) # 任务函数参数
}
}

启动Worker进程

celery -A celery_app worker -loglevel=info

启动Beat进程

celery -A celery_app beat

更复杂的定时配置

1
2
3
4
5
6
7
8
9
10
from celery.schedules import crontab

app.conf.beat_schedule = {
# 每周一早上7:30执行
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (),
},
}

定时配置参考

Example Meaning
crontab() 每分钟执行一次
crontab(minute=0, hour=0) 每天午夜执行
crontab(minute=0, hour=’*/3’) 每三小时执行一次:凌晨3点、6点、9点、中午3点、下午6点、晚上9点。
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’) 同上
crontab(minute=’*/15’) 每15分钟执行一次
crontab(day_of_week=’sunday’) 每分钟执行一次,在星期天
crontab(minute=’‘,hour=’‘, day_of_week=’sun’) 同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’) 每十分钟执行一次,但只能在周四或周五的凌晨3-4点、下午5-6点和晚上10-11点之间执行
crontab(minute=0, hour=’/2,/3’) 每偶数小时执行一次,每小时可被三整除。这意味着:除了:凌晨1点、5点、7点、11点、下午1点、下午5点、晚上7点、晚上11点以外,每小时
crontab(minute=0, hour=’*/5’) 执行可被5整除的小时数。这意味着它是在下午3点触发的,而不是下午5点(因为下午3点等于24小时时钟值“15”,可以被5整除)。
crontab(minute=0, hour=’*/3,8-17’) 每小时执行一次,可被3整除,办公时间(上午8点到下午5点)每小时执行一次
crontab(day_of_month=’2’) 每月第二天执行
crontab(day_of_month=’2-30/3’) 每偶数天执行一次
crontab(day_of_month=’1-7,15-21’) 每月第一周和第三周执行
crontab(day_of_month=’11’,month_of_year=’5’) 每年5月11日执行
crontab(month_of_year=’*/3’) 每季度第一个月执行

4. Django项目中使用Celery

可以直接使用Celery,同时python还提供了django-celery模块

本实例使用django-celery

pip install django-celery

pip install redis==2.10.6

1
2
3
4
5
6
7
8
9
10
11
django_celery                    # 项目根目录
├── celery_app # 异步应用
│ ├── __init__.py
│ ├── tasks.py # 任务文件
├── django_celery
│ ├── __init__.py
│ ├── celery.py # celery文件
│ ├── settings.py # django配置文件
│ ├── urls.py
│ └── wsgi.py
└── manage.py # 应用程序

4.1 在应用中新建tasks任务

tasks.py

1
2
3
4
5
6
7
8
9
# tasks.py
from __future__ import absolute_import, unicode_literals
import time
from celery import shared_task

@shared_task
def add(x, y):
time.sleep(5)
return x + y

4.2 在settings.py 的同级目录下添加一个 celery.py文件

celery.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_celery.settings")

# 创建celery应用
app = Celery('django_celery')
app.config_from_object('django.conf:settings')

# 如果在工程的应用中创建了 tasks.py 模块,那么Celery应用就会自动去检索创建的任务。
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

__init__.py

1
2
3
# __init__.py
from __future__ import absolute_import
from .celery import app as celery_app

4.3 在settings配置文件中注册应用,并加入celery配置

settings.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# settings.py
INSTALLED_APPS = [
# ...
'djcelery',
'celery_app',
]

import djcelery
djcelery.setup_loader()
CELERY_TIMEZONE = 'Asia/Shanghai'
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_IMPORTS = ('celery_app.tasks',)
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

4.4 启动worker

1
2
celery -A django_celery worker -l info   # 以交互方式启动
celery multi stop w1 -A first_django_app -l info # 以守护进程方式启动

4.5 测试

python manage.py shell

1
2
3
4
5
6
7
>>> from celery_app.tasks import add
>>> add.delay(5,5)
<AsyncResult: 331a736b-2569-456b-beb9-1f87be17d32b>
>>> from celery.result import AsyncResult
>>> res = AsyncResult("331a736b-2569-456b-beb9-1f87be17d32b")
>>> res.result
10

查看redis客户端:

1614350818692

5.异步发送邮件小实例

settings里配置邮箱的一些参数

1
2
3
4
5
6
7
8
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = True # 是否使用TLS安全传输协议(用于在两个通信应用程序之间提供保密性和数据完整性。)
EMAIL_USE_SSL = False # 是否使用SSL加密,qq企业邮箱要求使用
EMAIL_HOST = 'smtp.163.com' # 发送邮件的邮箱 的 SMTP服务器,这里用了163邮箱
EMAIL_PORT = 25 # 发件箱的SMTP服务器端口
EMAIL_HOST_USER = 'xxxxx@163.com' # 发送邮件的邮箱地址
EMAIL_HOST_PASSWORD = 'HDFUIANJKFDFI' # 发送邮件的邮箱密码(这里使用的是授权码)
EMAIL_TO_USER_LIST = ['xxxxx@163.com'] # 此字段是可选的,用来配置收件人列表

邮件发送代码,放到 celery_app/utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from django.core.mail import send_mail
from django.conf import settings

def send_email(subject,message):
send_mail(subject,
message,
settings.EMAIL_HOST_USER,
settings.EMAIL_TO_USER_LIST,)
print('邮件发送成功!')

# subject 邮件标题
# message 普通邮件正文, 普通字符串
# from_email 发件人
# recipient_list 收件人列表,这里一定要是列表
# html_message 多媒体邮件正文,可以是html字符串

编写异步发送邮件的 task

1
2
3
4
5
6
7
# tasks.py
from .utils import send_email
@shared_task
def send_email_task(subject, message):
time.sleep(5)
send_email(subject, message)
return True

视图函数,使用异步处理

1
2
3
4
5
6
7
8
9
# views.py
from django.http import HttpResponse
from celery_app.tasks import send_email_task
def send_email(request, *args, **kwargs):
try:
res = send_email_task.delay('这是来自django-celery的测试','这是一次celery异步邮件测试')
except Exception as e:
print('异常:{}'.format(str(e)))
return HttpResponse('发送邮件成功,请耐心等待')

创建路由,主路由URLConf 配置,添加celery_app应用的映射入口

1
2
3
4
5
from django.urls import path,include

urlpatterns = [
path('', include('celery_app.urls'))
]

celery_app下新建urls.py文件

1
2
3
4
5
from django.urls import path
from celery_app import views
urlpatterns = [
path('send_email/', views.send_email),
]

最后启动项目,输入指定url,如http://127.0.0.1:8000/send_email/

此时请求会立马返回,提示邮件发送成功,而任务会交给 celery 去执行,并最后将结果放到 redis 服务中,邮件通过异步的方式发到指定邮箱

1614354566951

都看到这里了,不赏点银子吗^v^