celery学习笔记

目录​

celery实现异步任务: 1

celery定时任务: 2

celery组件: 5

实例化celery: 5

发送1个celery任务: 5

任务组: 6

任务链: 6

重写celery基类; 6

celery命令: 7

celery监控: 7

celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度;​

celery的架构由三部分组成:​

消息中间件message broker,celery本身不提供消息服务,但可方便的和第三方提供的消息中间件集成包括RabbitMQ、redis等;​

任务执行单元worker,worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中;​

任务执行结果存储task result store组成,task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果,包括AMQP、redis等;​

另celery还支持不同的并发和序列化的手段:​

并发,prefork、eventlet、gevent、threads/singleThreaded;​

序列化,pickle、json、yaml、mspack、zlib、bzip2Compression、cryptographic message signing等;​

使用场景:​

celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行,我们通常使用它来实现异步任务async task和定时任务crontab;​

异步任务:将耗时操作任务提交给celery去异步执行,比如发送短信|邮件、消息推送、音视频处理等;​

计划任务:定期执行某项任务,如每日数据统计;​

[En]

Scheduled task: to perform something regularly, such as daily data statistics; ​

celery具有以下优点:​

simple简单:celery使用和维护都非常简单,并且不需要配置文件;​

highly available高可用:worker和client会在网络连接丢失或失败时,自动进行重试,并且有的brokers也支持双主或主从实现高可用;​

fast快速:单个的celery进程每分钟可处理百万级的任务,并且只需要ms级的往返延迟(使用rabbitmq|librabbitmq|优化设置时);​

flexible灵活:celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消息者、生产者、broker传输等;​

安装:​

pip install -U Celery ​

取执行结果:​

from celery.result import AsyncResult ​

from celery_task import cel ​

async_result = AsyncResult(id=’477c9d77-c62b-4fe5-9035-9087f7ad018a’, app=cel)​

if async_result.successful():​

result = async_result.get()​

执行成功’)​

将结果删除​

无论现在是什么时候,​都应该被终止。

[En]

No matter when it is now, ​ should be terminated.

如果任务尚未开始,则可以终止​

[En]

If the task has not started, you can terminate the ​

elif async_result.failed():​

执行失败’)​

elif async_result.status == ‘PENDING’:​

任务等待中被执行’)​

elif async_result.status == ‘RETRY’:​

任务出错正在重试’)​

elif async_result.status == ‘STARTED’:​

任务已经开始执行’)​

celery_tasks/{init.py, celery.py, task01.py, task02.py}​

celery_tasks/celery.py ​

from celery import Celery ​

cel = Celery(‘celery_demo’, broker=’redis://127.0.0.1:6379/1′, backend=’redis://127.0.0.1:6379/2′, include=[‘celery_tasks.task01’, ‘celery_tasks.task02’])​

cel.conf.timezone = ‘Asia/Shanghai’​

cel.conf.enable_utc = False ​

check_result.py ​

from celery.result import AsyncResult ​

from celery_task import cel ​

res = AsyncResult(id=’…’, app=cel)​

if res.successful():​

result = res.get()​

print(result)​

elif res.failed():​

print(‘failed’)​

elif res.status == ‘PENDING’:​

任务等待中被执行’)​

elif res.status == ‘RETRY’:​

任务异常后正在重试’)​

elif res.status == ‘STARTED’:​

任务已经开始被执行’)​

ctime = datetime.now()​

utc_ctime = datetime.utcfromtimestramp(ctime.timestamp())​

time_delay = timedelta(seconds=10)​

task_time = utc_ctime + time_delay ​

result = send_email.apply_async(args=[‘egon’], eta=task_time) # 有eta就是定时任务​

celery实现异步任务:​

celery通过消息进行通信,用专用的工作线程不断监视任务队列以执行新工作;​

celery需要消息传输来发送和接收消息,mq和redis代理传输功能齐全,但也支持其它解决方案,这里用redis;​

pip install celery eventlet redis #eventlet仅win下需要​

tasks.py #用于配置任务,main.py用来执行​

from celery import Celery ​

import time ​

celery = Celery(“tasks”,​

broker=”redis://:ane56pda@10.10.101.47:6378/0″,​

backend=”redis://:ane56pda@10.10.101.47:6378/0″)​

@celery.task #加上此装饰器,这个函数就变成celery任务了(task)​

def send_mail():​

邮件开始发送….’)​

time.sleep(10)​

邮件发送结束!’)​

main.py #执行后,会发现立马就结束,不会被阻塞等待10s ​

from tasks import send_mail ​

if name == ‘main‘:​

res =send_mail.delay()#这样调用,就变成异步任务了,不会被阻塞,是.apply_async()是快捷方式,.apply_async((2, 2), queue=’lopri’, countdown=10)可指定运行参数|运行的时间|使用的任务队列,返回结果为AsyncResult实例,可用于跟踪任务状况;res.ready()检测是否已处理完毕;res.get(timeout=1)将异步调用转为同步调用,res.get(propagate=False)如果任务出现异常,get()会再次引发异常使用此参数覆盖,res.traceback进行回溯;res.id获取任务ID;res.failed(),res.successful(),检查任务执行成功或失败;res.state,PENDING–>STARED–>SUCCESS,另RETRY;​

celery -A tasks.celery –pool=eventlet worker –loglevel=info 上不指定–pool ​

注:​

broker(中间人):存储任务的队列​

worker:真正执行任务的工作者,单独手动运行worker,celery -A tasks.celery worker –loglevel=info ​

backend:用来存储任务执行后的结果​

redis://:password@hostname:port/db_number ​

注:​

Run a worker ​

If you jumped in and already executed the above code you will be disappointed to learn that .wait() will never actually return. That’s because you also need to run a Celery worker to receive and execute the task.​

$ celery -A your_application.celery worker ​

The your_application string has to point to your application’s package or module that creates the celery object.​

Now that the worker is running, wait will return the result once the task is finished.​

celery定时任务:​

生产配置:​

redis ​

celery ​

django-celery-results ​

django-celery-beat ​

eventlet ​

INSTALLED_APPS = [​

…​

‘django_celery_results’,​

‘django_celery_beat’​

python manage.py makemigrations django_celery_beat ​

python manage.py migrate django_celery_beat ​

python manage.py makemigrations django_celery_results ​

python manage.py migrate django_celery_results ​

CELERY_BROKER_URL = ‘redis://127.0.0.1:6379/0’​

CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/0’​

CELERY_RESULT_BACKEND = ‘django-db’​

CELERY_RESULT_SERIALIZER = ‘json’​

CELERY_TIMEZONE = TIME_ZONE ​

celery.py ​

from future import absolute_import, unicode_literals ​

import os ​

from celery import Celery, platforms ​

from celery.schedules import crontab ​

set the default Django settings module for the ‘celery’ program.​

os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘yto_monitor.settings_prd’)​

app = Celery(‘yto_monitor’, broker=’redis://127.0.0.1:6379/’, backend=’redis://127.0.0.1:6379/’)​

app = Celery(‘yto_monitor’)​

Using a string here means the worker doesn’t have to serialize ​

the configuration object to child processes.​

  • namespace=’CELERY’ means all celery-related configuration keys ​

should have a CELERY_ prefix.​

app.config_from_object(‘django.conf:settings_prd’, namespace=’CELERY’)​

Load task modules from all registered Django app configs.​

app.autodiscover_tasks()​

@app.task(bind=True)​

def debug_task(self):​

print(‘Request: {0!r}’.format(self.request))​

platforms.C_FORCE_ROOT = True ​

app.conf.beat_schedule = {​

‘add-task’: {​

‘task’: ‘midmonitor.tasks.add’,​

另crontab(minute=30, hour=0),crontab(hour=6, minute=0, day_of_month=’1′),datetime.timedelta(seconds=20),​

‘args’: (5, 6)​

app.conf.timezone = ‘Asia/Shanghai’​

midmonitor/tasks.py ​

Create your tasks here ​

from future import absolute_import, unicode_literals ​

from celery import shared_task ​

@shared_task ​

def inspect_redis():​

pass ​

celery -A yto_monitor –pool=eventlet worker -l info #处理任务​

celery -A yto_monitor beat -l info #发送任务​

/usr/local/python368/bin/celery –workdir=/data/app/yto_monitor -A yto_monitor worker –loglevel=info ​

/usr/local/python368/bin/celery –workdir=/data/app/yto_monitor -A yto_monitor beat –loglevel=info ​

/usr/local/python368/bin/uwsgi –ini /data/app/yto_monitor/yto_monitor/uwsgi_test.ini ​

celery组件:​

worker (任务执行者),用来执行具体任务,可在多台服务器部署实现扩展,项目中我们使用 python 进行开发​

broker (中间人),用来实现任务调度、worker 管理等功能;支持 RabbitMQ、Redis、Zookeeper 等中间件,项目中我们使用 redis ​

backend 用来存储任务结果,项目中我们使用 redis ​

application (应用),用来实例化 celery ​

tasks (任务),用来构建 application ​

实例化celery:​

最简化构建一个 celery 应用,指定了 broker 和 backend ​

from celery import Celery ​

定义 broker 和 backend,分别为任务中间人和结果保存路径​

BROKER = “redis://:@127.0.0.1:6379/3″​

BACKEND = “redis://:@127.0.0.1:6379/4″​

app = Celery(“tasks”,broker=BROKER,backend=BACKEND,)​

定义一个任务,名字为 add ​

@app.task ​

def add(x, y):​

c = x + y ​

print(‘计算结果为: %d ‘ % c)​

return c ​

@app.task(bind=True,max_retries=3) # 最大重试 3 次​

def test_retry(self):​

print(‘执行 Celery 重试’)​

raise self.retry(countdown=1) # 1 秒后执行重试​

@app.task(bind=True)​

def test_fail(self):​

print(‘执行 Celery 失败’)​

raise RuntimeError(‘测试 celery 失败’)​

发送1个celery任务:​

test_sender.py ​

脚本用来发送 celery 任务​

from test_celery import *​

最简洁的推送任务,不支持任何​选项

[En]

The most concise push a task, do not support any option ​

add.delay(3,6)​

推送任务,第一个参数​

[En]

Push a task, the first parameter, ​

如果任务只需要一个参数,必须添加逗号进行转换,格式 (var1,)​

countdown=10,10 秒后开始执行​

add.apply_async((2,5), countdown=10)​

参数的其他写法,​

add.apply_async(kwargs={‘x’:4, ‘y’:8})​

add.s(5,6).apply_async()​

任务失败重试​

test_retry.apply_async()​

任务组:​

test_sender.py中追加​

任务组​

from celery import group ​

numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]​

res = group(add.subtask(i) for i in numbers).apply_async()​

print(res.get())​

任务链:​

test_sender.py中追加​

使用 link,将任务结果作为第一个参数传递到下一个任务​

add.apply_async((2, 3), link=add.s(16))​

类似地,前一个任务的结果被用作下一个任务的第一个参数​

[En]

Similarly, the result of the previous task is used as the first parameter ​ of the next task

from celery import chain ​

res = chain(add.s(2, 2), add.s(4), add.s(8))()​

print(res.get())​

使用管道符​

(add.s(2, 2) | add.s(4) | add.s(8))().get()​

重写celery基类;​

celery event,监控celery相关事件,可定制worker进程报警|任务失败报警等功能;​

定时任务;​

celery命令:​

celery -A test_celery worker -l info #worker会一直占用终端,可用-D放至后台;worker进程中包含多个子进程,默认为cpu核数,可用-c指定启动子进程个数;-Q queue_name1,queue_name2指定队列名称,多个用逗号分隔,推送任务到指定队列add.apply_async((10,20),queue=’queue_name1′)​

celery -A test_celery report #查看celery相关信息​

celery -A test_celery inspect active_queues #查看活动队列​

celery -A test_celery inspect stats #检查状态​

celery -A test_celery inspect report #检查报告​

celery -A tset_celery purge #清除队列中的任务​

celery -A test_celery inspect ping #发送ping ​

celery -A test_celery control shutdown #关闭worker进程​

celery -A test_celery worker –autoscale=10,2 -n au #动态加载celery pool个数​

celery -A test_celery status #查看worker集群中存活的节点​

]# vim celery.py ​

app.conf.task_routes = {‘cmdb.api.v_center.get_cluster_data’: {‘queue’:’vcenter’}}​

/usr/local/python368/bin/celery –workdir=/data/app/yto_monitor -A yto_monitor worker -Q vcenter ,celery -l info # -Q指定队列名,celery为默认的​

celery监控:​

pip install flower #安装celery监控插件​

celery multi start 3 -A test_celery -l info -c 4 –pidfile=tmp/celery_%n.pid -f logs/celery.log #启动3个worker,每个worker启动4个子进程​

celery flower -A test_celery –port=8080 ​

/usr/local/python368/bin/celery flower –workdir=/data/app/yto_monitor -A yto_monitor –port=8080 ​

Original: https://blog.51cto.com/jowin/5569753
Author: chaijowin
Title: celery学习笔记

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/500847/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球