celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用

  1. Celery介绍

Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

简单,易于使用和维护,有丰富的文档。

高效,单个celery进程每分钟可以处理数百万个任务。

灵活,celery中几乎每个部分都可以自定义扩展。

总之一句话,快

一. 官方文档

中文文档celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用http://docs.jinkan.org/docs/celery/getting-started/index.html ;

二. 流程图

celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用

三. 异步回调使用

1.安装

-U是update的意思,有就进行更新,没有就安装,后面单独将celery运行起来就可以了

pip install redis==3.4.1 -i https://pypi.douban.com/simple/
pip install celery==4.4.2 -i https://pypi.douban.com/simple/

2. 使用目录介绍.

a. 使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。

b. app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。

c. 一般celery任务目录直接放在项目的根目录下即可,路径:

├── mycelery/
    ├── config.py     # 配置文件
    ├── __init__.py
    ├── main.py       # 主程序
    └── sms/          # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖,也可以每个任务单独一个目录
        └── tasks.py  # 任务的文件,名称必须是这个!!!

3. 主要文件

1.main.py

主程序
import os
from celery import Celery

创建celery实例对象
app = Celery("dbj")  # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做dbj

把celery和django进行组合,需要识别和加载django的配置文件
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dbj.settings.dev')
如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import django

django.setup()

通过app对象加载配置
app.config_from_object("mycelery.config")

加载任务
参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms",])

2. config.py

配置文件
1. 有redis有密码
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'

2. redis无密码写法
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://127.0.0.1:6379/15'

3. 创建一个任务文件sms/tasks.py,并创建任务,代码:

celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!

from mycelery.main import app

@app.task(name="send_sms")  # name表示设置任务的名称,如果不填写,则默认使用函数名(路径)做为任务名
def send_sms():
    print("发送短信!!!")

@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2():
    print("发送短信任务2!!!")

3.1 创建任务案例

from mycelery.main import app
from lyapi.libs.tencent.sms import send_sms_single

任务列表,在main里面会被自动发现,但不会执行,只有send_sms.delay(code, phone) 才执行
@app.task(name='send_sms')  # 给任务起的别名叫send_sms
def send_sms(phone_num, template_id, template_param_list):
    ret = send_sms_single(phone_num, template_id, template_param_list)
    # ret 还是成功信息/错误信息
    return ret

4. 控制台启动

启动Celery的命令
切换目录到mycelery根目录下启动
celery -A mycelery.main worker --loglevel=info

5. 最后一步,views使用

import datetime
from auc_celery.sms.tasks import send_sms

celery -A auc_celery.main worker -l info -P eventlet
def text(request):
    """生成id"""
    # 1.立即执行
    res = send_sms.delay(1, 2)
"""
    # 2.定时任务(只执行一次,不是周期性定时任务)
    ctime = datetime.datetime.now()
    # 本地时间转换成utc时间
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    target_time = utc_ctime + datetime.timedelta(seconds=3)
    res = send_sms.apply_async(args=[6, 3], eta=target_time)
"""
    return HttpResponse(res.id)

def text2(request):
    """根据id找任务结果及相关"""
    nid = request.GET.get('nid')
    # 导入包
    from celery.result import AsyncResult
    # 导入app
    from auc_celery.sms.tasks import app
    # 导入内置的函数,获取任务对象
    result_object = AsyncResult(id=nid, app=app)
    # 获取返回结果
    # data1 = result_object.get()  # 3
    # 获取状态码
    # status = result_object.status  # SUCCESS
    # # 取消任务
    # result_object.revoke()
    # print('success', result_object.successful())
    # print(data1)
    if result_object.successful():
        # print('success', result_object.successful())
        # 获取结果
        data = result_object.get()
        print("data", data)
        # 清空
        # result_object.forget()
    # 失败的逻辑处理
    elif result_object.failed():
        print('失败')
    else:
        print('未知')
    return HttpResponse('...')

周期定时器
celery -A auc_celery.main beat
celery启动 -A--->文件名 worker--->启动工作者  -l info--->日志等级  -P eventlet--->解决在windows上报错的问题
celery  -A auc_celery.main worker -l info -P eventlet
"""
小贴士: 如果需要celery保存上次任务运行的时间在数据文件中,通过 -s 指定一个名为celerybeat-schedule的文件即可:
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
"""

四. 如果报以下错误

Celery ValueError: not enough values to unpack (expected 3, got 0)

报错原因:

win10上运行celery4.x以上的版本就会出现这个问题

解决办法:

1.安装一个eventlet模块

pip3 install eventlet -i https://pypi.douban.com/simple/

2. 然后启动celery的时候加一个参数

celery -A  worker -l info -P eventlet

启动案例:

celery -A mycelery.main worker -l info -P eventlet

输入到文件

celery -A cat_celery.main worker -l info -P eventlet --logfile=D:/pythonProject/cat/logs/celery.log

ps:

详细描述错误原因

错误详细介绍celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用https://blog.csdn.net/qq_52385631/article/details/122786364?spm=1001.2014.3001.5501 ;

五. 定时器使用(配置同上,)

定时器官方文档celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html ;

a. 在clery文件夹里面创建一个包,order

celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用

b. 在config.py配置中生成定时器

配置文件
1. 有redis有密码
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'

2. redis无密码写法
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://127.0.0.1:6379/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://127.0.0.1:6379/15'

定时器的配置文件
导入定时器
from celery.schedules import crontab
from .main import app

定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.

    # check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行
    'check_order_outtime': {
        # 本次调度的任务
        'task': 'check_order',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
        # 定时任务的调度周期
        # 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00
        'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始
        # 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递
    },
}

c. 定时器执行的任务 tasks.py(文件一定要叫这个名字)

from mycelery.main import app
from order.models import Order
import datetime

@app.task(name="check_order")
def check_order():
    # 进行订单时间的校验
    #  支付时间 < 当前时间 + 时间段 ,把订单状态进行修改
    # 我们配置的1分钟执行1次
    return 'ok'

定时器执行的任务案例

from mycelery.main import app
from order.models import Order
import datetime

@app.task(name="check_order")
def check_order():
    # 进行订单时间的校验
    #  支付时间 < 当前时间 + 时间段 ,把订单状态进行修改
    timer = datetime.datetime.now() + datetime.timedelta(days=1)
    Order.objects.filter(pay_time__lte=timer, order_status=0).update(order_status=3)
    return 'ok'

d. main.py 配置自动发现

app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])

完整版 main.py

主程序
import os
from celery import Celery

创建celery实例对象
app = Celery("xxxx")  # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做xxx

把celery和django进行组合,需要识别和加载django的配置文件
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lyapi.settings.dev')
如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import django

django.setup()

通过app对象加载配置
app.config_from_object("mycelery.config")

加载任务
参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])

六. 启动流程

启动Celery的命令
切换目录到mycelery根目录下启动
会报错
celery -A mycelery.main worker --loglevel=info

报错的类型
Celery ValueError: not enough values to unpack (expected 3, got 0)

解决办法
pip install eventlet
celery -A  worker -l info -P eventlet

定时器启动
celery -A mycelery.main beat
启动celery
celery -A mycelery.main worker -l info -P eventlet

七:多个定时器的配置

7.1 task

import datetime
import uuid
from auction_celery.main import app

from api import models

@app.task(name="check_auction")
def check_auction():
    """拍卖场未开始--->预售"""
    # 当前时间,拍卖场开始时间,拍卖场结束时间
    now = datetime.datetime.now()
    # 拍卖场开始时间开始"""
    # 当前时间,拍卖场开始时间,拍卖场结束时间
    now = datetime.datetime.now()
    # 拍卖场开始时间结束"""
    # 当前时间,拍卖场开始时间,拍卖场结束时间
    now = datetime.datetime.now()
    # 拍卖场开始时间

7.2 config配置

配置文件
1. 有redis有密码
任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@106.14.42.253:8889/14'
结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@106.14.42.253:8889/15'

定时器的配置文件
导入定时器
from celery.schedules import crontab
from .main import app

定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.

    # check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行
    'check_auction_out_time': {
        # 本次调度的任务
        'task': 'check_auction',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
        # 定时任务的调度周期
        # 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00
        'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始
        # 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递
    },
    'check_auction_pay': {
        # 本次调度的任务
        'task': 'check_auction_pay',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
        # 定时任务的调度周期
        # 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00
        'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始
        # 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递
    },
    'check_auction_end': {
        # 本次调度的任务
        'task': 'check_auction_end',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
        # 定时任务的调度周期
        # 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00
        'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始
        # 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递
    },
    'check_auction_item_order': {
        # 本次调度的任务
        'task': 'check_auction_item_order',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了
        # 定时任务的调度周期
        # 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00
        'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始
        # 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递
    },
}
# 配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
时区
app.conf.timezone = 'Asia/Shanghai'
是否使用UTC
app.conf.enable_utc = False

  1. 时间celery配置

8.1 配置1(推荐)

&#x914D;&#x7F6E;&#x65F6;&#x95F4;&#xFF0C;&#x5982;&#x679C;django&#xFF0C;settings&#x91CC;&#x9762;&#x7684;&#x65F6;&#x95F4;&#xFF0C;&#x6CA1;&#x6709;&#x6539;&#xFF0C;&#x4E0B;&#x9762;&#x4E24;&#x884C;&#x53EF;&#x4EE5;&#x4E0D;&#x5199;&#xFF0C;&#x56E0;&#x4E3A;django&#x9ED8;&#x8BA4;utc&#x65F6;&#x95F4;
&#x65F6;&#x533A;
app.conf.timezone = 'Asia/Shanghai'
&#x662F;&#x5426;&#x4F7F;&#x7528;UTC
app.conf.enable_utc = False

8.2 配置2(可能报错)

CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
  1. 定时器的时间crontab()

1.介绍

其中,crontab()实例化的时候没设置任何参数,都是使用默认值。crontab一共有7个参数,常用有5个参数分别为:
minute:分钟,范围0-59;
hour:小时,范围0-23;
day_of_week:星期几,范围0-6。以星期天为开始,即0为星期天。这个星期几还可以使用英文缩写表示,例如”sun”表示星期天;
day_of_month:每月第几号,范围1-31;
month_of_year:月份,范围1-12。

a、默认参数

这些参数可以设置表达式,表达稍微复杂的设置。默认值都是”*”星号,代表任意时刻。即crontab()相当与:
含义是每天、每小时、每分钟执行一次任务。这说法太反人类语言习惯,简单说就是每1分钟执行一次任务。

crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')

b、具体某个值

上面提到这些参数的取值范围。我们可以直接设置某个值。例如:
即每小时的15分时刻执行一次任务。直接指定某个时刻。

crontab(minute=15)

以此类推可以设置每天0点0分时刻执行任务的设置如下:

crontab(minute=0, hour=0)

当然,也可以设置多个值。例如0分和30分执行一次任务:
这里使用字符串,用逗号隔开数值。这里的逗号是表示多个表达式or逻辑关系。

crontab(minute='0,30')

c、设置范围

设置范围也是设置多个值,例如指定9点到12点每个小时的每分钟执行任务。

 crontab(minute='*', hour='9-12')

这里*号是默认值,可以省略如下:

 crontab(hour='9-12')

上面提到逗号是or逻辑关系。拓展一下,指定9点到12点和20点中每分钟执行任务:

 crontab(hour='9-12,20')

crontab的表达式越来越复杂了。celery还提供了一个类得到表达式解析结果,代码如下:

 from celery.task.schedules import crontab_parser
 r = crontab_parser(23, 0).parse('9-12,20')
 print(r)

其中,crontab_parse是一个解析类。第1个参数是范围的最大值;第2个参数是范围的最小值。通过parse输入表达式,可得到表达式的解析结果:

 set([9, 10, 11, 12, 20])

d、设置间隔步长

假如我要设置1、3、5、7、9、11月份每天每分钟执行任务,按照上面的做法可以设置如下:

crontab(day_of_month='1,3,5,7,9,11')

观察数据可以发现,都是间隔2的步长。需要设置的数字比较少,若数字比较多显得很麻烦。例如我想每间隔2分钟就执行一次任务,要写30个数字想想就觉得很麻烦。crontab表达式还提供了间隔的处理,例如:

 crontab(minute='*/2')
 crontab(minute='0-59/2') #&#x6548;&#x679C;&#x7B49;&#x540C;&#x4E0A;&#x9762;

这个/号不是除以的意思。相当与range的第3个参数,例如:

 range(0, 59+1, 2)

差不多crontab表达式就这些,多举几个例子:

#&#x6BCF;2&#x4E2A;&#x5C0F;&#x65F6;&#x4E2D;&#x6BCF;&#x5206;&#x949F;&#x6267;&#x884C;1&#x6B21;&#x4EFB;&#x52A1;
crontab(hour='*/2')

#&#x6BCF;3&#x4E2A;&#x5C0F;&#x65F6;&#x7684;0&#x5206;&#x65F6;&#x523B;&#x6267;&#x884C;1&#x6B21;&#x4EFB;&#x52A1;
#&#x5373;[0,3,6,9,12,15,18,21]&#x70B9;0&#x5206;
crontab(minute=0, hour='*/3')

#&#x6BCF;3&#x4E2A;&#x5C0F;&#x65F6;&#x6216;8&#x70B9;&#x5230;12&#x70B9;&#x7684;0&#x5206;&#x65F6;&#x523B;&#x6267;&#x884C;1&#x6B21;&#x4EFB;&#x52A1;
#&#x5373;[0,3,6,9,12,15,18,21]+[8,9,10,11,12]&#x70B9;0&#x5206;
crontab(minute=0, hour='*/3,8-12')

#&#x6BCF;&#x4E2A;&#x5B63;&#x5EA6;&#x7684;&#x7B2C;1&#x4E2A;&#x6708;&#x4E2D;&#xFF0C;&#x6BCF;&#x5929;&#x6BCF;&#x5206;&#x949F;&#x6267;&#x884C;1&#x6B21;&#x4EFB;&#x52A1;
#&#x6708;&#x4EFD;&#x8303;&#x56F4;&#x662F;1-12&#xFF0C;&#x6BCF;3&#x4E2A;&#x6708;&#x4E3A;[1,4,7,10]
crontab(month_of_year='*/3')

#&#x6BCF;&#x6708;&#x5076;&#x6570;&#x5929;&#x6570;&#x7684;0&#x70B9;0&#x5206;&#x65F6;&#x523B;&#x6267;&#x884C;1&#x6B21;&#x4EFB;&#x52A1;
crontab(minute=0, hour=0, day_of_month='2-31/2')

#&#x6BCF;&#x5E74;5&#x6708;11&#x53F7;&#x7684;0&#x70B9;0&#x5206;&#x65F6;&#x523B;&#x6267;&#x884C;1&#x6B21;&#x4EFB;&#x52A1;
crontab(0, 0, day_of_month='11', month_of_year='5')

Original: https://blog.csdn.net/qq_52385631/article/details/122783121
Author: 骑台风走
Title: celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/734554/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球