Apscheduler是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。
pip install apscheduler
- BlockingScheduler: 调度器在当前进程的主线程中运行,会阻塞当前线程。
- BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。
- AsyncIOScheduler: 结合asyncio模块一起使用。
- GeventScheduler: 程序中使用gevent作为IO模型和GeventExecutor配合使用。
- TornadoScheduler: 程序中使用Tornado的IO模型,用 ioloop.add_timeout 完成定时唤醒。
- TwistedScheduler: 配合TwistedExecutor,用reactor.callLater完成定时唤醒。
-
QtScheduler: 应用是一个Qt应用,需使用QTimer完成定时唤醒。
-
date
是最基本的一种调度,作业任务只会执行一次。参数详见 interval
触发器,固定时间间隔触发。参数详见cron
触发器,在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。参数详见
执行器是执行调度任务的模块。最常用的 executor 有两种: ProcessPoolExecutor
和 ThreadPoolExecutor
存储预定的作业。默认job store只是将作业保存在内存中,但其他job store将它们存储在各种数据库中。作业的数据在保存到持久job store时被序列化,并在从它加载回来时被反序列化。job store(默认存储除外)不会将作业数据保存在内存中,而是充当中间人,用于在后端保存、加载、更新和搜索作业。job store绝不能在调度程序之间共享。
apscheduler在app创建时进行初始化, scheduler在初始化器类中进行创建,初始化器类采用单例设计, 在整个flask实例上下文中都是同一个对象,以满足通过接口进行动态更新任务的需求
from apscheduler.schedulers.background import BackgroundScheduler
def singleton(cls):
_instance = {}
def _singleton(*args, **kargs):
if cls not in _instance:
_instance[cls] = cls(*args, **kargs)
return _instance[cls]
return _singleton
@singleton
class Initializer:
def __init__(self):
self.scheduler = BackgroundScheduler()
scheduler初始化之后可以通过add_job添加一些固定的定时任务, 再通过start方法就可以正式启动定时任务
from flask_apscheduler import APScheduler
from src.initializer import Initializer
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
initializer = Initializer()
update_job = {
"id": "print_jobs",
"func": "src.utils.common.scheduler:update_jobs",
"args": (initializer.scheduler,),
"trigger": "interval",
"seconds": 10,
}
initializer.scheduler.add_job(**update_job)
initializer.scheduler.start()
return app
在初始化flask实例时进行了apscheduler单例化, 可以动态的通过flask接口进行定时任务的删除和新增.使用唯一的job_id进行任务的获取, 删除 和 新增.
from flask import current_app
@app.route("/test_apscheduler")
def test_apscheduler():
"""测试定时任务动态进行删除和新增任务"""
ping_baidu = {
"id": "ping_baidu",
"func": "src.utils.common.scheduler:ping",
"args": ("baidu.com",),
"trigger": "interval",
"seconds": 10,
}
job = current_app.apscheduler.get_job(job_id="ping")
if job:
current_app.apscheduler.remove_job(job_id="ping")
current_app.apscheduler.add_job(**ping_baidu)
return http_json({"msg": "ok"})
如上在进行flask初始化时有预设一个update_job的定时任务,传参为scheduler调度器, 每2分钟运行一次, 在内部实现任务从配置文件或数据库读取,然后进行任务的删除、更新、新增操作,以此达到定时任务的自我管理。
def update_job(scheduler):
"""定时任务自我管理,动态更新job"""
jobs = scheduler.get_jobs()
new_jobs, expired_jobs = get_job_from_db(jobs)
for job in expired_jobs:
scheduler.remove_job(job_id=job.id)
for job in new_jobs:
scheduler.add_job(job)
解决思路: 在启动定时任务时, 设置文件锁, 当不能获取到文件锁时, 不再启动任务
def create_app():
app =Flask(__name__)
scheduler_init(app)
return app
def scheduler_init(app):
"""保证系统只启动一次定时任务"""
if platform.system() != 'Windows':
fcntl = __import__("fcntl")
f = open('scheduler.lock', 'wb')
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
scheduler.init_app(app)
scheduler.start()
app.logger.debug('Scheduler Started,---------------')
except:
pass
def unlock():
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
atexit.register(unlock)
else:
msvcrt = __import__('msvcrt')
f = open('scheduler.lock', 'wb')
try:
msvcrt.locking(f.fileno(), msvcrt.LK_NBLCK, 1)
scheduler.init_app(app)
scheduler.start()
app.logger.debug('Scheduler Started,----------------')
except:
pass
def _unlock_file():
try:
f.seek(0)
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
except:
pass
atexit.register(_unlock_file)
apscheduler本身不支持分布式,因此分布式部署还是会出现重复启动问题.
Original: https://blog.csdn.net/u013896904/article/details/126383213
Author: WongBaba
Title: Apscheduler结合flask进行动态任务管理
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/747123/
转载文章受原作者版权保护。转载请注明原作者出处!