- Celery介绍
Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。
简单,易于使用和维护,有丰富的文档。
高效,单个celery进程每分钟可以处理数百万个任务。
灵活,celery中几乎每个部分都可以自定义扩展。
总之一句话,快
一. 官方文档
中文文档http://docs.jinkan.org/docs/celery/getting-started/index.html ;
二. 流程图
三. 异步回调使用
1.安装
-U是update的意思,有就进行更新,没有就安装,后面单独将celery运行起来就可以了
pip install redis==3.4.1 -i https://pypi.douban.com/simple/
pip install celery==4.4.2 -i https://pypi.douban.com/simple/
2. 使用目录介绍.
a. 使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。
b. app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。
c. 一般celery任务目录直接放在项目的根目录下即可,路径:
├── mycelery/
├── config.py # 配置文件
├── __init__.py
├── main.py # 主程序
└── sms/ # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖,也可以每个任务单独一个目录
└── tasks.py # 任务的文件,名称必须是这个!!!
3. 主要文件
1.main.py
主程序
import os
from celery import Celery
创建celery实例对象
app = Celery("dbj") # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做dbj
把celery和django进行组合,需要识别和加载django的配置文件
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dbj.settings.dev')
如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import django
django.setup()
通过app对象加载配置
app.config_from_object("mycelery.config")
加载任务
参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms",])
2. config.py
配置文件
1. 有redis有密码
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'
2. redis无密码写法
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://127.0.0.1:6379/15'
3. 创建一个任务文件sms/tasks.py,并创建任务,代码:
celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from mycelery.main import app
@app.task(name="send_sms") # name表示设置任务的名称,如果不填写,则默认使用函数名(路径)做为任务名
def send_sms():
print("发送短信!!!")
@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2():
print("发送短信任务2!!!")
3.1 创建任务案例
from mycelery.main import app
from lyapi.libs.tencent.sms import send_sms_single
任务列表,在main里面会被自动发现,但不会执行,只有send_sms.delay(code, phone) 才执行
@app.task(name='send_sms') # 给任务起的别名叫send_sms
def send_sms(phone_num, template_id, template_param_list):
ret = send_sms_single(phone_num, template_id, template_param_list)
# ret 还是成功信息/错误信息
return ret
4. 控制台启动
启动Celery的命令
切换目录到mycelery根目录下启动
celery -A mycelery.main worker --loglevel=info
5. 最后一步,views使用
import datetime
from auc_celery.sms.tasks import send_sms
celery -A auc_celery.main worker -l info -P eventlet
def text(request):
"""生成id"""
# 1.立即执行
res = send_sms.delay(1, 2)
"""
# 2.定时任务(只执行一次,不是周期性定时任务)
ctime = datetime.datetime.now()
# 本地时间转换成utc时间
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
target_time = utc_ctime + datetime.timedelta(seconds=3)
res = send_sms.apply_async(args=[6, 3], eta=target_time)
"""
return HttpResponse(res.id)
def text2(request):
"""根据id找任务结果及相关"""
nid = request.GET.get('nid')
# 导入包
from celery.result import AsyncResult
# 导入app
from auc_celery.sms.tasks import app
# 导入内置的函数,获取任务对象
result_object = AsyncResult(id=nid, app=app)
# 获取返回结果
# data1 = result_object.get() # 3
# 获取状态码
# status = result_object.status # SUCCESS
# # 取消任务
# result_object.revoke()
# print('success', result_object.successful())
# print(data1)
if result_object.successful():
# print('success', result_object.successful())
# 获取结果
data = result_object.get()
print("data", data)
# 清空
# result_object.forget()
# 失败的逻辑处理
elif result_object.failed():
print('失败')
else:
print('未知')
return HttpResponse('...')
周期定时器
celery -A auc_celery.main beat
celery启动 -A--->文件名 worker--->启动工作者 -l info--->日志等级 -P eventlet--->解决在windows上报错的问题
celery -A auc_celery.main worker -l info -P eventlet
"""
小贴士: 如果需要celery保存上次任务运行的时间在数据文件中,通过 -s 指定一个名为celerybeat-schedule的文件即可:
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
"""
四. 如果报以下错误
Celery ValueError: not enough values to unpack (expected 3, got 0)
报错原因:
win10上运行celery4.x以上的版本就会出现这个问题
解决办法:
1.安装一个eventlet模块
pip3 install eventlet -i https://pypi.douban.com/simple/
2. 然后启动celery的时候加一个参数
celery -A worker -l info -P eventlet
启动案例:
celery -A mycelery.main worker -l info -P eventlet
输入到文件
celery -A cat_celery.main worker -l info -P eventlet --logfile=D:/pythonProject/cat/logs/celery.log
ps:
详细描述错误原因
错误详细介绍https://blog.csdn.net/qq_52385631/article/details/122786364?spm=1001.2014.3001.5501 ;
五. 定时器使用(配置同上,)
定时器官方文档http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html ;
a. 在clery文件夹里面创建一个包,order
b. 在config.py配置中生成定时器
配置文件
1. 有redis有密码
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'
2. redis无密码写法
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://127.0.0.1:6379/15'
定时器的配置文件
导入定时器
from celery.schedules import crontab
from .main import app
定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
# check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行
'check_order_outtime': {
# 本次调度的任务
'task': 'check_order', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
# 定时任务的调度周期
# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00
'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始
# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递
},
}
c. 定时器执行的任务 tasks.py(文件一定要叫这个名字)
from mycelery.main import app
from order.models import Order
import datetime
@app.task(name="check_order")
def check_order():
# 进行订单时间的校验
# 支付时间 < 当前时间 + 时间段 ,把订单状态进行修改
# 我们配置的1分钟执行1次
return 'ok'
定时器执行的任务案例
from mycelery.main import app
from order.models import Order
import datetime
@app.task(name="check_order")
def check_order():
# 进行订单时间的校验
# 支付时间 < 当前时间 + 时间段 ,把订单状态进行修改
timer = datetime.datetime.now() + datetime.timedelta(days=1)
Order.objects.filter(pay_time__lte=timer, order_status=0).update(order_status=3)
return 'ok'
d. main.py 配置自动发现
app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])
完整版 main.py
主程序
import os
from celery import Celery
创建celery实例对象
app = Celery("xxxx") # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做xxx
把celery和django进行组合,需要识别和加载django的配置文件
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lyapi.settings.dev')
如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import django
django.setup()
通过app对象加载配置
app.config_from_object("mycelery.config")
加载任务
参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])
六. 启动流程
启动Celery的命令
切换目录到mycelery根目录下启动
会报错
celery -A mycelery.main worker --loglevel=info
报错的类型
Celery ValueError: not enough values to unpack (expected 3, got 0)
解决办法
pip install eventlet
celery -A worker -l info -P eventlet
定时器启动
celery -A mycelery.main beat
启动celery
celery -A mycelery.main worker -l info -P eventlet
七:多个定时器的配置
7.1 task
import datetime
import uuid
from auction_celery.main import app
from api import models
@app.task(name="check_auction")
def check_auction():
"""拍卖场未开始--->预售"""
# 当前时间,拍卖场开始时间,拍卖场结束时间
now = datetime.datetime.now()
# 拍卖场开始时间开始"""
# 当前时间,拍卖场开始时间,拍卖场结束时间
now = datetime.datetime.now()
# 拍卖场开始时间结束"""
# 当前时间,拍卖场开始时间,拍卖场结束时间
now = datetime.datetime.now()
# 拍卖场开始时间
7.2 config配置
配置文件
1. 有redis有密码
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@106.14.42.253:8889/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@106.14.42.253:8889/15'
定时器的配置文件
导入定时器
from celery.schedules import crontab
from .main import app
定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
# check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行
'check_auction_out_time': {
# 本次调度的任务
'task': 'check_auction', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
# 定时任务的调度周期
# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00
'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始
# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递
},
'check_auction_pay': {
# 本次调度的任务
'task': 'check_auction_pay', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
# 定时任务的调度周期
# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00
'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始
# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递
},
'check_auction_end': {
# 本次调度的任务
'task': 'check_auction_end', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
# 定时任务的调度周期
# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00
'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始
# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递
},
'check_auction_item_order': {
# 本次调度的任务
'task': 'check_auction_item_order', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
# 定时任务的调度周期
# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00
'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始
# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递
},
}
# 配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
时区
app.conf.timezone = 'Asia/Shanghai'
是否使用UTC
app.conf.enable_utc = False
- 时间celery配置
8.1 配置1(推荐)
配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
时区
app.conf.timezone = 'Asia/Shanghai'
是否使用UTC
app.conf.enable_utc = False
8.2 配置2(可能报错)
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
- 定时器的时间crontab()
1.介绍
其中,crontab()实例化的时候没设置任何参数,都是使用默认值。crontab一共有7个参数,常用有5个参数分别为:
minute:分钟,范围0-59;
hour:小时,范围0-23;
day_of_week:星期几,范围0-6。以星期天为开始,即0为星期天。这个星期几还可以使用英文缩写表示,例如”sun”表示星期天;
day_of_month:每月第几号,范围1-31;
month_of_year:月份,范围1-12。
a、默认参数
这些参数可以设置表达式,表达稍微复杂的设置。默认值都是”*”星号,代表任意时刻。即crontab()相当与:
含义是每天、每小时、每分钟执行一次任务。这说法太反人类语言习惯,简单说就是每1分钟执行一次任务。
crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')
b、具体某个值
上面提到这些参数的取值范围。我们可以直接设置某个值。例如:
即每小时的15分时刻执行一次任务。直接指定某个时刻。
crontab(minute=15)
以此类推可以设置每天0点0分时刻执行任务的设置如下:
crontab(minute=0, hour=0)
当然,也可以设置多个值。例如0分和30分执行一次任务:
这里使用字符串,用逗号隔开数值。这里的逗号是表示多个表达式or逻辑关系。
crontab(minute='0,30')
c、设置范围
设置范围也是设置多个值,例如指定9点到12点每个小时的每分钟执行任务。
crontab(minute='*', hour='9-12')
这里*号是默认值,可以省略如下:
crontab(hour='9-12')
上面提到逗号是or逻辑关系。拓展一下,指定9点到12点和20点中每分钟执行任务:
crontab(hour='9-12,20')
crontab的表达式越来越复杂了。celery还提供了一个类得到表达式解析结果,代码如下:
from celery.task.schedules import crontab_parser
r = crontab_parser(23, 0).parse('9-12,20')
print(r)
其中,crontab_parse是一个解析类。第1个参数是范围的最大值;第2个参数是范围的最小值。通过parse输入表达式,可得到表达式的解析结果:
set([9, 10, 11, 12, 20])
d、设置间隔步长
假如我要设置1、3、5、7、9、11月份每天每分钟执行任务,按照上面的做法可以设置如下:
crontab(day_of_month='1,3,5,7,9,11')
观察数据可以发现,都是间隔2的步长。需要设置的数字比较少,若数字比较多显得很麻烦。例如我想每间隔2分钟就执行一次任务,要写30个数字想想就觉得很麻烦。crontab表达式还提供了间隔的处理,例如:
crontab(minute='*/2')
crontab(minute='0-59/2') #效果等同上面
这个/号不是除以的意思。相当与range的第3个参数,例如:
range(0, 59+1, 2)
差不多crontab表达式就这些,多举几个例子:
#每2个小时中每分钟执行1次任务
crontab(hour='*/2')
#每3个小时的0分时刻执行1次任务
#即[0,3,6,9,12,15,18,21]点0分
crontab(minute=0, hour='*/3')
#每3个小时或8点到12点的0分时刻执行1次任务
#即[0,3,6,9,12,15,18,21]+[8,9,10,11,12]点0分
crontab(minute=0, hour='*/3,8-12')
#每个季度的第1个月中,每天每分钟执行1次任务
#月份范围是1-12,每3个月为[1,4,7,10]
crontab(month_of_year='*/3')
#每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')
#每年5月11号的0点0分时刻执行1次任务
crontab(0, 0, day_of_month='11', month_of_year='5')
Original: https://blog.csdn.net/qq_52385631/article/details/122783121
Author: 骑台风走
Title: celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/734554/
转载文章受原作者版权保护。转载请注明原作者出处!