Apscheduler结合flask进行动态任务管理

​ Apscheduler是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。

pip install apscheduler
  • BlockingScheduler: 调度器在当前进程的主线程中运行,会阻塞当前线程。
  • BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。
  • AsyncIOScheduler: 结合asyncio模块一起使用。
  • GeventScheduler: 程序中使用gevent作为IO模型和GeventExecutor配合使用。
  • TornadoScheduler: 程序中使用Tornado的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler: 配合TwistedExecutor,用reactor.callLater完成定时唤醒。
  • QtScheduler: 应用是一个Qt应用,需使用QTimer完成定时唤醒。

  • date是最基本的一种调度,作业任务只会执行一次。参数详见

  • interval触发器,固定时间间隔触发。参数详见
  • cron 触发器,在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。参数详见

执行器是执行调度任务的模块。最常用的 executor 有两种: ProcessPoolExecutorThreadPoolExecutor

存储预定的作业。默认job store只是将作业保存在内存中,但其他job store将它们存储在各种数据库中。作业的数据在保存到持久job store时被序列化,并在从它加载回来时被反序列化。job store(默认存储除外)不会将作业数据保存在内存中,而是充当中间人,用于在后端保存、加载、更新和搜索作业。job store绝不能在调度程序之间共享。

apscheduler在app创建时进行初始化, scheduler在初始化器类中进行创建,初始化器类采用单例设计, 在整个flask实例上下文中都是同一个对象,以满足通过接口进行动态更新任务的需求


from apscheduler.schedulers.background import BackgroundScheduler

def singleton(cls):
    _instance = {}

    def _singleton(*args, **kargs):
        if cls not in _instance:
            _instance[cls] = cls(*args, **kargs)
        return _instance[cls]
    return _singleton

@singleton
class Initializer:

    def __init__(self):
        self.scheduler = BackgroundScheduler()

scheduler初始化之后可以通过add_job添加一些固定的定时任务, 再通过start方法就可以正式启动定时任务


from flask_apscheduler import APScheduler
from src.initializer import Initializer

def create_app():
    app = Flask(__name__)
    app.config.from_object(Config)

    initializer = Initializer()
    update_job = {
        "id": "print_jobs",
        "func": "src.utils.common.scheduler:update_jobs",
        "args": (initializer.scheduler,),
        "trigger": "interval",
        "seconds": 10,
    }
    initializer.scheduler.add_job(**update_job)
    initializer.scheduler.start()
    return app

在初始化flask实例时进行了apscheduler单例化, 可以动态的通过flask接口进行定时任务的删除和新增.使用唯一的job_id进行任务的获取, 删除 和 新增.

from flask import current_app

@app.route("/test_apscheduler")
def test_apscheduler():
    """测试定时任务动态进行删除和新增任务"""

    ping_baidu = {
        "id": "ping_baidu",
        "func": "src.utils.common.scheduler:ping",
        "args": ("baidu.com",),
        "trigger": "interval",
        "seconds": 10,
    }
    job = current_app.apscheduler.get_job(job_id="ping")
    if job:
        current_app.apscheduler.remove_job(job_id="ping")

    current_app.apscheduler.add_job(**ping_baidu)
    return http_json({"msg": "ok"})

如上在进行flask初始化时有预设一个update_job的定时任务,传参为scheduler调度器, 每2分钟运行一次, 在内部实现任务从配置文件或数据库读取,然后进行任务的删除、更新、新增操作,以此达到定时任务的自我管理。


def update_job(scheduler):
    """定时任务自我管理,动态更新job"""
    jobs = scheduler.get_jobs()
    new_jobs, expired_jobs = get_job_from_db(jobs)
    for job in expired_jobs:
        scheduler.remove_job(job_id=job.id)
    for job in new_jobs:
        scheduler.add_job(job)

解决思路: 在启动定时任务时, 设置文件锁, 当不能获取到文件锁时, 不再启动任务


def create_app():
    app =Flask(__name__)

    scheduler_init(app)
    return app

def scheduler_init(app):
    """保证系统只启动一次定时任务"""
    if platform.system() != 'Windows':
        fcntl = __import__("fcntl")
        f = open('scheduler.lock', 'wb')
        try:
            fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
            scheduler.init_app(app)
            scheduler.start()
            app.logger.debug('Scheduler Started,---------------')
        except:
            pass

        def unlock():
            fcntl.flock(f, fcntl.LOCK_UN)
            f.close()

        atexit.register(unlock)
    else:
        msvcrt = __import__('msvcrt')
        f = open('scheduler.lock', 'wb')
        try:
            msvcrt.locking(f.fileno(), msvcrt.LK_NBLCK, 1)
            scheduler.init_app(app)
            scheduler.start()
            app.logger.debug('Scheduler Started,----------------')
        except:
            pass

        def _unlock_file():
            try:
                f.seek(0)
                msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
            except:
                pass

        atexit.register(_unlock_file)

apscheduler本身不支持分布式,因此分布式部署还是会出现重复启动问题.

Original: https://blog.csdn.net/u013896904/article/details/126383213
Author: WongBaba
Title: Apscheduler结合flask进行动态任务管理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/747123/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球