flask 定时任务 flask-apscheduler

Flask-APScheduler介绍

Flask-APScheduler是基于APScheduler库开发的Flask拓展库。APScheduler的全称是Advanced Python Scheduler。允许您将Python代码安排为稍后执行,可以只执行一次,也可以定期执行。您可以随时添加新作业或删除旧作业。如果您将作业存储在数据库中,那么调度程序重启后它们也将存活下来并保持其状态。当调度器重新启动时,它将运行它在离线时应该运行的所有作业,APScheduler文档

pip install flask-apscheduler

实例展示

flask 定时任务 flask-apscheduler

flask 定时任务 flask-apscheduler

使用flask配置启动定时任务

APSchedule可以使用很多方式进行启动任务,比如interval,或者cron等等,下面就分别介绍一下这两种方式启动任务。

interval间隔时间执行

我们可以通过配置如下参数来每间隔多少时间来启动任务

JOBS = [
        {
            'id': 'job1',
            'func': 'scheduler:task',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 10
        }
    ]

其中 func表示你要启动的函数, trigger表示触发方式,这里使用的 interval表示间隔触发, second表示间隔的时间长短。

我们可以通过flask配置启动定时任务,例子如下

from flask import Flask
import datetime
from flask_apscheduler import APScheduler

aps = APScheduler()

class Config(object):
    JOBS = [
        {
            'id': 'job1',
            'func': 'scheduler:task',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 10
        }
    ]
    SCHEDULER_API_ENABLED = True

def task(a, b):
    print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))

if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())

    scheduler = APScheduler()
    scheduler.init_app(app)
    scheduler.start()

    app.run(port=8000)

上述代码中, 通过APScheduler每间隔10秒钟执行一次task函数。

cron启动任务

crontab是Linux中定时任务启动程序,我们可以通过配置crontab的配置文件来定时启动任务。在APScheduler中也可以通过cron的形式来定时启动任务。下载的例子来说明配置方式。

from flask import Flask
import datetime
from flask_apscheduler import APScheduler

aps = APScheduler()

class Config(object):
    JOBS = [
        {
            'id': 'job1',
            'func': 'scheduler:task',
            'args': (1, 2),
            'trigger': 'cron',
            'day': '*',
            'hour': '13',
            'minute': '16',
            'second': '20'
        }
    ]
    SCHEDULER_API_ENABLED = True

def task(a, b):
    print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))

if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())

    scheduler = APScheduler()
    scheduler.init_app(app)
    scheduler.start()

    app.run(port=8000)

上述的代码表示,在 _每天的13:16:20秒_启动 task()函数。其实看配置就能理解意思,一目了然,其中*代表任意的意思。

使用装饰器定时启动任务

除了上面通过配置的方式来启动定时任务外,我们还可以使用装饰器的方式来定时启动任务。例子如下所示

from flask import Flask
from flask_apscheduler import APScheduler
import datetime

class Config(object):
    SCHEDULER_API_ENABLED = True

scheduler = APScheduler()

interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
    print(str(datetime.datetime.now()) + ' Job 1 executed')

cron examples
@scheduler.task('cron', id='do_job_2', minute='*')
def job2():
    print(str(datetime.datetime.now()) + ' Job 2 executed')

@scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
def job3():
    print(str(datetime.datetime.now()) + ' Job 3 executed')

@scheduler.task('cron', id='do_job_3', day='*', hour='13', minute='26', second='05')
def job4():
    print(str(datetime.datetime.now()) + ' Job 4 executed')

if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())

    # it is also possible to enable the API directly
    # scheduler.api_enabled = True
    scheduler.init_app(app)
    scheduler.start()

    app.run(port=8000)

上述代码的含义如下:

  1. job1: 每间隔30s执行一次函数
  2. job2: 每分钟执行一次函数
  3. job3: 每周的星期天执行一次函数
  4. job4: 每天的13:26:05时刻执行一次函数

Original: https://blog.csdn.net/weixin_47315136/article/details/124995666
Author: 这是很长很好的一生
Title: flask 定时任务 flask-apscheduler

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/750450/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球