Celery异步任务

情景: 用户发起request,并等待response返回。在本些views中,可能需要执行一段耗时的程序,那么用户就会等待很长时间,造成不好的用户体验,比如发送邮件、手机验证码等。

使用celery后,情况就不一样了。

解决: 将耗时的程序放到celery中执行。

Celery 是一个包含一系列的消息任务队列。您可以不用了解内部的原理直接使用,它的使用时非常简单的。

  • 选择并且安装一个消息中间件(Broker)
  • 安装 Celery 并且创建第一个任务
  • 运行职程(Worker)以及调用任务
  • 跟踪任务的情况以及返回值

Celery名词:

  • 任务 task:就是一个Python函数。
  • 队列 queue:将需要执行的任务加入到队列中。
  • 工人 worker:在一个新进程中,负责执行队列中的任务。
  • 代理人 broker:负责调度,在布置环境中使用redis。

选择中间人(Broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)

以下有几种选择:

  • RabbitMQ:RabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的。
  • Redis:Redis 功能比较全,但是如果突然停止运行或断电会造成数据丢失。

安装 Celery

可以用标准的 Python 工具,诸如 pipeasy_install 来安装:

$ pip install celery

Celery单应用

创建第一个 Celery 实例程序,我们把创建 Celery 程序成为 Celery 应用或直接简称 为 app,创建的第一个实例程序可能需要包含 Celery 中执行操作的所有入口点,例如创建任务、管理职程(Worker)等,所以必须要导入 Celery 模块。

在本教程中将所有的内容,保存为一个 app 文件中。针对大型的项目,可能需要创建 独立的模块。

首先创建 utils/tasks.py

import os
import sys
import django
from celery import Celery
from django.conf import settings
from django.core.mail import send_mail
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))  # 确定项目路径
sys.path.append(project_path)  # 将项目路径添加到系统搜索路径中
os.environ['DJANGO_SETTINGS_MODULE'] = 'django_test.settings'  # 设置项目的配置文件
django.setup()  # Django初始化

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def task_send_email(user_id, username, email):
    # 加密用户信息,生成token
    info = {'confirm': user_id}
    serializer = Serializer(settings.SECRET_KEY, 3600)
    token = serializer.dumps(info).decode()

    # 发送邮件
    '''发送重置密码邮件'''
    reset_url = "{}/user/reset/password/?token={}".format(settings.HOST_URL, token)
    subject = "重置密码"
    message = '邮件正文'
    sender = settings.DEFAULT_FROM_EMAIL
    receiver = [email]
    html_message = '{}, 您好!请在1小时内点击下面链接重置密码:{}'.format(username, reset_url, reset_url)
    send_mail(subject, message, sender, receiver, html_message=html_message)

  • 第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称。
  • 第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。 更多相关的 Celery 中间人(Broker)的选择方案,可查阅上面的中间人(Broker)。 例如,对于 RabbitMQ 可以写为 amqp://localhost ,使用 Redis 可以写为 redis://localhost
  • 创建了一个名称为 task_send_email 的任务,实现邮件的发送。

注意: 在 单文件中,想要导入 Django项目的 settings.py配置文件

  • 先确定Django项目所在的绝对路径
  • 追加Django所在的路径到系统搜索路径
  • 配置系统的Django环境变量

现在可以使用 worker 参数进行执行我们刚刚创建职程(Worker):

注意: 此时运行celery,是在Django项目下根目录中, 此时启动文件路径必须和 调用 异步任务的导包路径一致,否则,不识别**, -A celery实例名,即为当前文件的名

$ celery -A utils.task worker --loglevel=info

Celery集成django使用

注意: 以前版本的Celery需要一个单独的库才能与Django一起使用,但是从3.1开始,情况不再如此。现在就已支持Django。

注意: Celery 4.0支持Django 1.8和更高版本。对于Django 1.8之前的版本,请使用Celery 3.1。

要将Celery与Django项目一起使用,必须首先定义Celery库的实例(称为 “app”)

如果您拥有当前主流的Django项目结构,例如:

- django_test/
  - manage.py
  - django_test/
    - __init__.py
    - settings.py
    - urls.py

那么建议的方法是创建一个新的 django_test/django_test/celery.py模块,该模块定义 Celery实例

file: django_test/django_test/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings')

app = Celery('django_test')

Using a string here means the worker doesn't have to serialize
the configuration object to child processes.

- namespace='CELERY' means all celery-related configuration keys
  should have a CELERY_ prefix.

app.config_from_object('django.conf:settings', namespace='CELERY')

Load task modules from all registered Django app configs.

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后,需要将此程序导入到 django_test/django_test/__ init__.py模块中。这样可以确保在Django启动时加载应用程序,以便@shared_task装饰器将使用该应用程序:

file: django_test/django_test/__init__.py:

from __future__ import absolute_import, unicode_literals

This will make sure the app is always imported when
Django starts so that shared_task will use this app.

from .celery import app as celery_app

__all__ = ('celery_app',)

请注意,此示例项目结构适用于较大的项目,对于简单项目,可以使用一个包含的模块来定义应用程序和任务。

让我们分解一下第一个模块中发生的情况,首先我们从 __freture__导入 absolute_import导入 unicode_literals,以使我们的 celery.py模块不会与库冲突:

from __future__ import absolute_import

然后,为celery命令行程序设置默认的 DJANGO_SETTINGS_MODULE环境变量:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings')

不需要此行,但可以避免始终将设置模块传递到celery程序中。它必须始终在创建应用程序实例之前出现,我们接下来要做的是:

app = Celery('django_test')

这是我们的库实例,可以有很多实例,但是使用Django时可能没有理由。 我们还将Django设置模块添加为Celery的配置源。这意味着不必使用多个配置文件,而直接从Django设置中配置Celery;但也可以根据需要将它们分开。

app.config_from_object('django.conf:settings', namespace='CELERY')

大写名称空间意味着所有Celery配置选项必须以大写而不是小写指定,并以 CELERY_开头,因此例如task_always_eager设置变为 CELERY_TASK_ALWAYS_EAGERbroker_url设置变为 CELERY_BROKER_URL

可以直接传递设置对象,但是使用字符串更好,因为这样一来,工作人员就不必序列化该对象。 CELERY_名称空间也是可选的,但建议使用(以防止与其他Django设置重叠)。

接下来,可重用应用程序的常见做法是在单独的task.py模块中定义所有任务,而Celery可以自动发现这些模块:

app.autodiscover_tasks()

在上述行中,Celery将按照task.py约定自动发现所有已安装应用程序中的任务:

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

这样,不必手动将各个模块添加到CELERY_IMPORTS设置中。 最后,debug_task示例是一个转储其自己的请求信息的任务。这是使用Celery 3.1中引入的新的 bind = True任务选项来轻松引用当前任务实例。

编写的任务可能会存在于可重用应用程序中,并且可重用应用程序不能依赖于项目本身,因此不能直接导入应用程序实例。 @shared_task装饰器可让您创建任务而无需任何具体的应用程序实例:

file: user/tasks.py:

Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.conf import settings
from django.core.mail import send_mail
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

@shared_task
def task_send_email(user_id, username, email):
    # 加密用户信息,生成token
    info = {'confirm': user_id}
    serializer = Serializer(settings.SECRET_KEY, 3600)
    token = serializer.dumps(info).decode()

    # 发送邮件
    '''发送重置密码邮件'''
    reset_url = "{}/user/reset/password/?token={}".format(settings.HOST_URL, token)
    subject = "重置密码"
    message = '邮件正文'
    sender = settings.DEFAULT_FROM_EMAIL
    receiver = [email]
    html_message = '{}, 您好!请在1小时内点击下面链接重置密码:{}'.format(username, reset_url,reset_url)
    send_mail(subject, message, sender, receiver, html_message=html_message)

现在可以使用 worker 参数进行执行我们刚刚创建职程(Worker):

注意: 此时运行celery,是在Django项目下-A celery实例名,即为Django项目的同名目录

$ celery -A django_test worker --loglevel=info

调用任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

from django.contrib.auth import authenticate
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from user.tools.auth import JSONWebTokenAuthentication # 自定义token认证类
from .task import task_send_email # 导入Django集成的task任务
from utils.task import task_send_email  # 导入 celery单应用的task任务

class ResetPassword(APIView):
    authentication_classes = [JSONWebTokenAuthentication]
    permission_classes = [IsAuthenticated]

    def post(self, request):
        user = request.user

        task_send_email.delay(user_id=user.id, username=user.username, email=user.email)
        return Response('发送成功')

该任务已经有职程( Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。

Original: https://www.cnblogs.com/gaogang/p/14843752.html
Author: 小高、
Title: Celery异步任务

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/586042/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • redis的事务不是原子性

    Reference: https://blog.csdn.net/u011692780/article/details/81213010 一、事务的四大特性 关系型数据库的事务具有…

    Linux 2023年5月28日
    088
  • Redis Hashes 数据类型简述

    Redis Hashes 是我们日常使用中比较高频的 Redis 数据类型,内部使用 Redis 字典结构存储,底层实现之一为哈希表结构。 下面从哈希表节点,哈下表结构,Redis…

    Linux 2023年5月28日
    085
  • VMware ESXi 7.0 U2 SLIC 2.6 & Unlocker 集成 Intel NUC 网卡、USB 网卡和 NVMe 驱动

    404. 抱歉,您访问的资源不存在。 可能是URL不正确,或者对应的内容已经被删除,或者处于隐私状态。 [En] It may be that the URL is incorre…

    Linux 2023年5月27日
    088
  • Ubuntu更换镜像源

    当修改 sources.list文件时,我们需要将下面任意一个镜像源的代码 复制粘贴到该文件中。 阿里源 阿里镜像源 deb http://mirrors.aliyun.com/u…

    Linux 2023年6月14日
    093
  • python写一个双色球彩票计算器

    首先声明,赌博一定不是什么好事,也完全没有意义,不要指望用彩票发财。之所以写这个,其实是用来练手的,可以参考这个来预测一些其他的东西,意在抛砖引玉。 啰嗦完了,马上开始,先上伪代码…

    Linux 2023年6月6日
    0107
  • 继承与初始化

    了解包括继承在内的初始化全过程: 执行结果: 程序运行时,先试图访问Beetle.main()(一个static方法),访问Beetle类的静态方法将会使Beetle类加载,即加载…

    Linux 2023年6月8日
    0110
  • maven安装及导入本地jar包

    一、maven的安装方法 1.去maven官网下载适合的版本 下载地址: 官方下载地址 2.下载后解压到任意目录 3.配置系统环境变量 M2_HOME ,值为maven解压后的目录…

    Linux 2023年6月14日
    0127
  • MyCAT实现MySQL读写分离

    用户连接到MySQL的中间件(代理),中间件接收用户的访问转发给后端的mysql数据库。 是MySQL的一个中间件软件,Mycat是一个开源的分布式数据库系统,是一个实现了MySQ…

    Linux 2023年6月7日
    088
  • python入门基础知识三(列表和元组)

    1. 形式 var = [‘char1′,’char2′,’char3’,…] var = [v…

    Linux 2023年6月7日
    0106
  • jmeter学习记录–05–Beanshell2

    学习beanshell时有不少的例子、遇到不少问题。在此记录下。 测试实例列表 A1:使用Beanshell请求作为测试请求 一个打包的Jar包,直接对其内的方法进行测试。 第一步…

    Linux 2023年5月28日
    0110
  • [极客大挑战 2019]Secret File

    0x01 寻找做题信息 打开环境,查看源代码,发现可疑链接,/Archive_room.php,action.php打开action.php会发生302跳转,查找302跳转无果,百…

    Linux 2023年6月8日
    0101
  • java分布式(第四章)——Redis

    老套路 1、什么是Redis 2、为什么要用Redis 3、怎么用Redis 4、使用Redis过程中遇到的问题 1、什么是Redis 介绍Redis之前先了解一下Nosql(非关…

    Linux 2023年6月7日
    084
  • 001.IT运维面试问题-Linux基础

    Redhat、CentOS、Fedora、SuSE、Debian、Ubuntu、FreeBSD等。 ⑴开机BIOS自检,加载硬盘。 ⑵读取MBR,MBR引导。 ⑶grub引导菜单(…

    Linux 2023年6月13日
    0103
  • beego 的打包问题

    使用BeeGo2.0 编译后 打包上传到服务器出现运行时问题。在app.conf配置文件开发环境改为生产环境就行了 runmode = prod Original: https:/…

    Linux 2023年6月13日
    095
  • 重写并自定义依赖的原生的Bean方法

    转载请注明出处: 在项目开发过程中,往往是直接应用很多jar包中依赖且声明好的Bean,拿来即用,但很多场景也需要对这些原生的Bean 进行自定义,定制化封装,这样在项目使用的过程…

    Linux 2023年6月15日
    0128
  • 【转载】人才成长攻略

    本文转载自知乎《前些天在知乎回复了一个帖子:怎么劝大四室友不要考计算机研?- 曹政的回答》,原作者曹政 评论里有一堆阴阳怪气的说法,什么没天赋怎么办,程序员也不是终身可靠的职业云云…

    Linux 2023年6月13日
    090
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球