python-celery基本使用

python版本库:

python:3.8.12

redis:4.1.3

celery:4.3.0

这个版本的搭配可用

启动命令:

celery_task是celery工程名

  • celery -A celery_task worker -l info -c 并发数
  • celery -A celery_task worker -l info -P eventlet
  • ==========================================
  • 定时任务的命令:
  • 启动 Beat 程序$ celery beat -A proj
    Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
  • 之后启动 worker 进程.$ celery -A proj worker –loglevel=INFO 或者$ celery -B -A proj worker –loglevel=INFO

参数说明:

-A :表示工程名,写了celery代码的py文件,只要写文件名不要py后缀

-l :表示监控的时候,输出的消息,有info、warn、error

-c :表示并发数

一、概念

celery是python用于异步处理任务的第三方库,是一种分布式队列的管理构建,支持异步任务和定时任务。在web开发的时候,可以处理一定程度的高并发现象。这里借用一张图:

python-celery基本使用

Producer:为任务的生产者,比如:你的程序调用一个注册账号的函数,这个程序就是生产者。

Broker为中间人:ta能检测任务请求,并开启并发模式,交由workers进行处理。

backend:就是任务执行结束的结果存储的地方,这个地方一般为数据库,例如:redis、RabbitMQ、librabbitmq等

总而言之:这个方法在一定程度上解决python因为GIL锁造成速度慢

二、异步任务

  1. 单目录

(1)celery_test.py: celery启动的文件,里面包含配置、任务

异步的服务
import celery
import time

broker = 'redis://127.0.0.1:6379/1' # 消息中间件
backend = "redis://127.0.0.1:6379/2" # 结果存储

cel = celery.Celery('test', backend=backend, broker=broker)

@cel.task
def send_email(name):
    print("向%s发送邮箱。。。" % name)
    time.sleep(5)
    print("向%s发送邮箱完成..." % name)
    return "ok"

@cel.task
def send_msg(name):
    print("向%s发送短信。。。" % name)
    time.sleep(5)
    print("向%s发送短信完成..." % name)
    return "ok"

(2)produce_task.py: 生产者文件,用于调用celery监听的函数

生产者
from celery_task import send_email, send_msg

for i in range(20):
    result = send_email.delay("lufei")
    print(result.id)
    result2 = send_msg.delay("nrduo")
    print(result2.id)

输出任务的id,可以通过id来获取到任务执行完的结果

(3)result.py: 结果查询消费者,可用于查询结果,也可以将获取到的结果进行删除,通过id进行查询

消费者
from celery.result import AsyncResult
from celery_test.celery_task import cel

async_result = AsyncResult(id='09a3939c-fe06-41d5-a5c7-7c3840ef3aa5', app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # 将结果删除,,执行完成,结果不会自动删除
    # result.forget()
    # 无论现在是什么状态,都要停止
    # result.revoker(terminate=True)
    # 如果还没开始就停止
    result.revoker(terminate=False)
elif async_result.failed():
    print("执行失败")
elif async_result.status == "PENDING":
    print("任务等待中被执行")
elif async_result.status == "RETRY":
    print("任务异常后正在重试")
elif async_result.status == "STARTED":
    print("任务已经开始被执行")
  1. 多目录结构

目录的层级
├── celery_tasks: 存celery相关的文件
│ ├── init.py:初始化文件
│ ├── celery.py:引入celery包、任务包、celery相关配置的文件
│ ├── task01.py:任务1(函数)
│ └── task02.py:任务2(函数)
├── check_result.py:任务结果查询消费者文件
└── produce_task.py:任务生产者文件

(1)celery_tasks目录:

(1.1)init:因为vscode创建包有时候会识别不到,所以需要这文件内的代码

import sys
import os
sys.path.append(os.path.dirname(os.path.realpath(__file__)))

(1.2)celery.py:这个文件似乎只能以这个为命名,具体原因还不清楚,但是使用其他命名会出现报错,celery找不到模块

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_tasks.task01',
                      'celery_tasks.task02'
                      ])

时区
cel.conf.timezone = 'Asia/Shanghai'
是否使用UTC
cel.conf.enable_utc = False

(1.3)task文件

# task01

import time
from celery_tasks.celery import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return "完成向%s发送邮件任务"%res

task02

import time
from celery_tasks.celery import cel

@cel.task
def send_msg(res):
    print("完成%s发送短信任务" % res)
    time.sleep(100)
    return "短信发送完成"

(2)check_result.py: 任务结果查询消费者

消费者
from celery.result import AsyncResult
from celery_tasks.celery import cel

async_result = AsyncResult(id='09a3939c-fe06-41d5-a5c7-7c3840ef3aa5', app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # 将结果删除,,执行完成,结果不会自动删除
    # result.forget()
    # 无论现在是什么状态,都要停止
    # result.revoker(terminate=True)
    # 如果还没开始就停止
    result.revoker(terminate=False)
elif async_result.failed():
    print("执行失败")
elif async_result.status == "PENDING":
    print("任务等待中被执行")
elif async_result.status == "RETRY":
    print("任务异常后正在重试")
elif async_result.status == "STARTED":
    print("任务已经开始被执行")

(3) produce_task.py: 任务生产者文件

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg
import time

start_time = time.time()

result = send_email.delay("yuan")
print(result.id)
result = send_msg.delay("alex")
print(result.id)

三、定时任务

  1. 单目录

(1)celery_task.py: celery启动的文件,里面包含配置、任务

定时的服务
import celery
import time

backend = "redis://127.0.0.1:6379/1" # 结果存储
broker = 'redis://127.0.0.1:6379/2' # 消息中间件

cel = celery.Celery('test', backend=backend, broker=broker)

@cel.task
def send_email(name):
    print("向%s发送邮箱。。。" % name)
    time.sleep(5)
    print("向%s发送邮箱完成..." % name)
    return "ok"

@cel.task
def send_msg(name):
    print("向%s发送短信。。。" % name)
    time.sleep(5)
    print("向%s发送短信完成..." % name)
    return "ok"

(2)produce_task.py: 定时任务消费者

from celery_task import send_email
from datetime import datetime

方式一
v1 = datetime(2022, 2, 17, 18, 00, 00)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp()) # 国际标准时间
print(v2)
result = send_email.apply_async(args=["egon",], eta=v2)
print(result.id)

方式二
ctime = datetime.now()
默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta # 时差类
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)
  1. 多目录

└── celery_tasks:存celery相关的文件
├── init.py:初始化文件
├── celery.py:引入celery包、任务包、celery相关配置的文件
├── task01.py:任务1(函数)
└── task02.py:任务2(函数)

定时任务的多目录和异步任务的多目录有一些不同,因为定时任务可以通过配置进行导入,前面说明过的目录就不再说明,因为代码都是一样的

(1)celery.py: 引入celery包、任务包、celery相关配置的文件

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_tasks.task01',
    'celery_tasks.task02',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

定时任务, 不用生产者,直接每6秒,将消息放入队列
cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_tasks.task01.send_email',
        # 每隔2秒执行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=6),
        # 传递参数
        'args': ('张三',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_tasks.task01.send_email',
    #     每年4月11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': ('张三',)
    # },
}

(2)task文件

task01

from celery_tasks.celery import cel
import time

@cel.task
def send_email(res):
    print("完成%s发送邮件任务" % res)
    time.sleep(5)
    return "邮件发送完成"

task02

import time
from celery_tasks.celery import cel

@cel.task
def send_msg(res):
    print("完成%s发送短信任务" % res)
    time.sleep(5)
    return "短信发送完成"

要启动定时任务得使用:celery beat命令去启动

总结点:celery4.x版本后不再支持window,如果需要使用得使用Linux;但是我们大部分使用的都是window系统,所以在win10版本中,可以下载linux子系统,在子系统中,可以共享win10的ip,这样子我们可以将环境配置到子系统,再用编辑器去调用子系统环境即可。当然会遇见很多bug和坑,但是要相信,今天踩的坑,就是你通往大佬的路,冲!

参考推荐博客:Mr·Yuan – 博客园

Original: https://blog.csdn.net/weixin_48609908/article/details/122986615
Author: hbase丶
Title: python-celery基本使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/744226/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球