文章目录
- 0. 安装redis、celery、flower
* - 0.1 安装redis
- 0.2 安装celery和flower
- 1. celery+flask代码结构及代码内容
* - 1.1 tasks.py
- 1.2 congfig.py
- 1.3 flask_service.py
- 1.4 init.py
- 2. 启动celery和flask服务
- 3. 启动flower-任务实时监控页面
-
安装redis、celery、flower
0.1 安装redis
可以参考 Redis 安装
0.2 安装celery和flower
conda install celery
pip install flower
- celery+flask代码结构及代码内容
我自己这个例子里代码目录结构如下:
; 1.1 tasks.py
下面的代码是celery服务的核心计算代码:计算一个list中所有元素的和;计算list中所有元素的乘积;
from celery import Celery
import time
import config
celery_app = Celery(config.celery_name,
broker=config.BROKER_URL,
backend=config.BACKEND_URL)
@celery_app.task
def add(input_list,k=10):
time.sleep(k)
return(sum(input_list))
1.2 congfig.py
celery_name = "test"
BACKEND_URL = "redis://localhost:6379/0"
BROKER_URL = "redis://localhost:6379/0"
PORT = 3000
1.3 flask_service.py
import json
from flask import Flask, request, jsonify
from celery import Celery
from tasks import add
import config
flask_app = Flask(__name__)
celery_app = Celery(config.celery_name,
broker=config.BROKER_URL,
backend=config.BACKEND_URL)
@flask_app.route(rule='/sum', methods=['POST'])
def add_service():
input_list = json.loads(request.data)['input_list']
k = json.loads(request.data)['k']
task = add.delay(input_list,k)
task_id = task.task_id
return jsonify({'code':200,"TaskId":task_id,"msg":"success"})
@flask_app.route(rule='/get_add', methods=['GET'])
def get_add():
task_id = json.loads(request.data)['TaskId']
task_result = celery_app.AsyncResult(task_id)
result = {
"code":200,
"msg":"success",
"TaskId": task_id,
"Status": task_result.status,
"Result": task_result.result
}
return jsonify(result)
if __name__ == '__main__':
flask_app.run(host='localhost', port=config.PORT, debug=True)
1.4 init .py
内容为空
- 启动celery和flask服务
在一个terminal里运行(路径要进入到app目录下面)celery:
celery -A tasks worker --loglevel=info
然后在另一个terminal里面运行flask服务,启动异步服务(在app目录下):
python3 flask_service.py
测试服务的代码在test_service.py里面,代码如下(要先请求求和的任务,然后每隔一秒钟请求结果服务,一共是两个服务):
import requests
import json
import time
from app.config import PORT
url = 'http://localhost:%d/sum'%PORT
d = {'input_list':[1,2,3,4,5],'k':5}
r = requests.post(url, data=json.dumps(d))
result = json.loads(r.content)
print(result)
task_id = result['TaskId']
url = 'http://localhost:%d/get_add'%PORT
d = {'TaskId':task_id}
while 1:
r = requests.get(url, data=json.dumps(d))
result = json.loads(r.content)
if result['Status'] == "SUCCESS":
print('add is ready ! Status = %s' % result['Status'], result)
break
else:
print('add NOT ready ! Status = %s' % result['Status'])
time.sleep(1)
- 启动flower-任务实时监控页面
网上有的教程说是 celery和 flower一起启动,我没试成功,我运行成功的是这个命令:
在上述操作完之后,再打开一个terminal,进入app目录下,运行:
celery -A tasks flower
然后可以打开界面 http://localhost:5555/ 查看任务:
说明1:运行这个flower的同时,前面的运行celery的 还要运行,也就是 celery -A tasks flower这个命令只是启动了flower。如果关闭前面的 celery -A tasks worker命令。 再提交测试服务的代码,会显示一直”PENDING”。
说明2:可以看到,上述的操作方式中,flower和celery是几乎独立的,先运行完celery,再运行flower就行。
- celery自带添加日志函数
使用celery自带了日志函数
from celery.utils.log import get_task_logger
celery_logger = get_task_logger(__name__)
celery_logger.info("日志内容")
Original: https://blog.csdn.net/qq_36810398/article/details/120421901
Author: 南瓜派三蔬
Title: python | 使用flask部署celery异步服务并用flower实时查看——初级代码教程
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/746751/
转载文章受原作者版权保护。转载请注明原作者出处!