Python定时任务 – apscheduler

  • 下面的代码构建了一个调度器, 实现了任务调度,任务删除,任务暂停,任务管理等功能

import datetime
import json
import os
import threading
import traceback
import logging

import apscheduler.jobstores.base
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_SUBMITTED, EVENT_JOB_REMOVED
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.util import undefined

from db.sqlite.base import BaseDb
from common.base_model import Strategy, Trigger
import common
from utils import parse_argv

logger = logging.getLogger("HomedTools")

class Scheduler(BaseDb):
    __table_name__ = "scheduler"
    STATE_OPEN = 1
    STATE_OFF = 0

    def __init__(self):
        super().__init__()
        self.ThreadPool = ThreadPoolExecutor(20)
        self.ProcessPool = ProcessPoolExecutor(10)

        self.table_name = self.create_table(self.__table_name__, self._create_sql)

        self.scheduler = BackgroundScheduler(executors={
            'default': self.ThreadPool,

        })
        self.trigger_map = {
            "cron": CronTrigger,
            "interval": IntervalTrigger,
            "date": DateTrigger}
        self.strategy = dict()
        self._load_strategy()
        self.strategy_cursor = max(self.strategy) if self.strategy else 1001
        self._start()
        self.threadingLock = threading.Lock()

    def feature(self):
        pass

    @property
    def strategy_max(self):
        with common.Lock:
            self.strategy_cursor += 1
            return self.strategy_cursor

    @property
    def _create_sql(self):
        return ("CREATE TABLE {} "
                "("
                "f_strategy_id   INT(11)   NOT NULL DEFAULT 1101, "
                "f_strategy_name  VARCHAR(256)   NOT NULL DEFAULT '', "
                "f_status  INT(6)   NOT NULL DEFAULT 1, "
                "f_model_name   VARCHAR(256)   NOT NULL DEFAULT '', "
                "f_trigger   TEXT  NOT NULL  DEFAULT '', "
                "f_genre   VARCHAR(66) NOT NULL, "
                "f_run_now   INT(6) NOT NULL DEFAULT 0, "
                "f_args   TEXT NOT NULL DEFAULT '', "
                "f_create_time  TIMESTAMP NOT NULL  DEFAULT CURRENT_TIMESTAMP, "
                "PRIMARY KEY (f_strategy_id)"
                "); ")

    def _load_strategy(self):

        sql = f"SELECT f_strategy_id, f_strategy_name, f_model_name, f_trigger, " \
            f"f_genre, f_status, f_run_now, f_args FROM {self.table_name}"
        for each in self.execute(sql=sql):
            strategy_id = each[0]
            strategy_name = each[1]
            model_name = each[2]
            trigger = json.loads(each[3])
            trigger["genre"] = each[4]
            status = each[-3]
            run_now = each[-2]
            args = each[-1]

            if model_name in common.modules.models:

                strategy_obj = Strategy(
                    model=common.modules.models[model_name],
                    trigger=trigger,
                    strategy_id=strategy_id,
                    strategy_name=strategy_name,
                    status=status,
                    args=args
                )

            else:
                strategy_obj = Strategy(
                    model=None,
                    trigger=trigger,
                    strategy_id=strategy_id,
                    strategy_name=strategy_name,
                    status=status,
                    args=args
                )
                logger.warning(f"[Scheduler:_load_strategy] {model_name} is not exist!!!")

            self._work(strategy_obj, run_now=run_now, is_load=1)

        logger.info(f"[Scheduler:_load_strategy] _load_strategy_end "
                    f"all_strategy_ram={self.strategy}, "
                    f"jobs={self.scheduler.get_jobs()}")

    def add(self, *args):
        logger.info(f"[Scheduler:add] 添加策略  args={args}")
        strategy_id, strategy_name, model_name, run_now, trigger, status, params = args
        strategy_obj = Strategy(
            model=common.modules.models[model_name],
            trigger=trigger,
            strategy_id=strategy_id,
            strategy_name=strategy_name,
            status=status,
            args=params
        )
        return self._work(strategy_obj, run_now=run_now)

    def update(self, strategy_id, trigger, args):
        return self.modify_strategy(strategy_id, trigger, args)

    def no_repeat(self, model_name):
        for k, v in self.strategy.items():
            if v.model.model_name == model_name:
                return v.strategy_id

    @staticmethod
    def job_id(strategy_id, pre="strategy"):
        return f"{pre}_{strategy_id}"

    @property
    def all_jobs(self):
        return [i.id for i in self.scheduler.get_jobs()]

    def modify_strategy(self, strategy_id, trigger, args):
        try:
            if strategy_id in self.strategy:
                strategy = self.strategy[strategy_id]

                logger.info(f"[Scheduler:modify_strategy] strategy_trigger={strategy.trigger_fields}")
                job_id = self.job_id(strategy_id)
                trigger_base = Trigger(trigger=trigger)
                genre, trigger = trigger_base.trigger_genre, trigger_base.trigger_fields

                trigger_obj = self.trigger_map[genre](**trigger)
                with self.threadingLock:

                    logger.info(f"[Scheduler:modify_strategy] scheduler_get_jobs={self.scheduler.get_jobs()}")
                    try:
                        self.scheduler.pause_job(job_id)
                    except apscheduler.jobstores.base.JobLookupError:
                        logger.info(f"[Scheduler:modify_strategy] no_have_job {job_id}, new one")
                        if strategy.trigger_genre != "date":
                            res = self.scheduler.add_job(func=strategy.model.job,
                                                         trigger=trigger_obj,
                                                         id=job_id,
                                                         args=args if args else None,
                                                         kwargs={"debug": 0},
                                                         next_run_time=undefined)
                        else:
                            res = self.scheduler.add_job(func=strategy.model.job,
                                                         trigger=trigger_obj,
                                                         id=job_id,
                                                         args=args if args else None,
                                                         kwargs={"debug": 0})
                        logger.info(f"[Scheduler:modify_strategy] strategy_work_success, "
                                    f"res={res}, call={strategy.model.job}"
                                    f" jobs={[i.func_ref for i in self.scheduler.get_jobs()]}")
                    else:
                        self.strategy[strategy_id].status = self.STATE_OFF

                        self.scheduler.modify_job(job_id=job_id, trigger=trigger_obj, args=args)

                        self.scheduler.resume_job(job_id)
                        self.strategy[strategy_id].status = self.STATE_OPEN
                    return 1
            else:

                return 0
        except Exception:
            logger.error(f"[Scheduler:modify_strategy] \n {traceback.format_exc()}")
            return -1

    def pause(self, strategy_id=None):
        if strategy_id is None:

            pause_flag = self.scheduler.pause()
        else:

            pause_flag = self.scheduler.pause_job(self.job_id(strategy_id))
        dump_flag = self._set_status(strategy_id=strategy_id, status=0)

        if all((pause_flag, dump_flag)):
            return 1
        else:
            return 0

    def _set_status(self, strategy_id=None, status=None):

        sql = f"UPDATE {self.table_name} SET f_status={status}"
        if strategy_id:
            self.strategy[strategy_id].status = status
            dump_flag = self.execute(sql=sql+f" WHERE f_strategy_id={strategy_id}")
        else:
            for strategy_id, strategy_obj in self.strategy.items():
                strategy_obj.status = status
            dump_flag = self.execute(sql=sql)
        return dump_flag

    def resume(self, strategy_id=None):
        if strategy_id is None:

            pause_flag = self.scheduler.resume()
        else:
            pause_flag = self.scheduler.resume_job(job_id=self.job_id(strategy_id))
        dump_flag = self._set_status(strategy_id=strategy_id, status=1)
        if all((pause_flag, dump_flag)):
            return 1
        else:
            return 0

    def remove(self, strategy_id):

        if strategy_id is None:

            self.scheduler.remove_all_jobs()
        else:
            if strategy_id not in self.strategy:
                logger.warning(f"[Scheduler:remove] strategy_no_exist {strategy_id}")
                return 0
            try:
                job_id = self.job_id(strategy_id)
                self.scheduler.remove_job(job_id=job_id)

            except apscheduler.jobstores.base.JobLookupError:

                logger.warning(f"[Scheduler:remove] strategy_no_in_job {strategy_id}")

            self.strategy.pop(strategy_id)
        return 1

    def event_driven(self):
        def job_error(event):

            logger.error(f"[Scheduler:event_driven:job_error] job {event.job_id}, execute error  \n {event.traceback} "
                         f"\n\n {event.exception}")

        def job_start(event):
            logger.info(f"[Scheduler:event_driven:job_start] job {event.job_id}")

        def job_end(event):
            logger.info(f"[Scheduler:event_driven:job_end] job {event.job_id}")

        def job_delete(event):
            logger.info(f"[Scheduler:event_driven:job_delete] job {event.job_id}")

        self.scheduler.add_listener(job_start, EVENT_JOB_SUBMITTED)
        self.scheduler.add_listener(job_error, EVENT_JOB_ERROR)
        self.scheduler.add_listener(job_end, EVENT_JOB_EXECUTED)
        self.scheduler.add_listener(job_delete, EVENT_JOB_REMOVED)

    def _start(self):
        self.event_driven()
        self.scheduler.start()
        logger.info('[[Scheduler:_start] 策略启动')

    def _work(self, strategy: Strategy, run_now, is_load=0):

        if strategy.model is None:
            self.strategy[strategy.strategy_id] = strategy
            return 1
        if strategy.status == self.STATE_OFF:
            logger.warning(f"[Scheduler:_work] strategy_is_off {strategy.strategy_name}")
            self.strategy[strategy.strategy_id] = strategy
            return 1
        if strategy.trigger_genre is not None:
            if is_load and strategy.trigger_genre == "date":
                logger.warning(f"[Scheduler:_work] run_once_no_load {strategy.strategy_name} ")
                self.strategy[strategy.strategy_id] = strategy
                return 1
            trigger = self.trigger_map[strategy.trigger_genre](
                **strategy.trigger_fields)
            args = strategy.args
            logger.info(f"[Scheduler:_work] "
                        f"strategy_parse_success, id {strategy.strategy_id},"
                        f" name {strategy.strategy_name},"
                        f" genre {strategy.trigger_genre}"
                        f" fields_value {strategy.trigger_fields}"
                        f" args={args}"
                        )

            if strategy.trigger_genre != "date":
                res = self.scheduler.add_job(func=strategy.model.job,
                                             trigger=trigger,
                                             id=self.job_id(strategy.strategy_id),
                                             args=args if args else None,
                                             kwargs={"debug": 0},
                                             next_run_time=datetime.datetime.now() if run_now else undefined)
            else:
                res = self.scheduler.add_job(func=strategy.model.job,
                                             trigger=trigger,
                                             id=self.job_id(strategy.strategy_id),
                                             args=args if args else None,
                                             kwargs={"debug": 0}
                                             )
            logger.info(f"[Scheduler:_work] strategy_work_success, run_now={run_now} res={res}, call={strategy.model.job}"
                        f" jobs={[i.func_ref for i in self.scheduler.get_jobs()]}")
            self.strategy[strategy.strategy_id] = strategy
            return 1
        else:
            logger.error(f'[Scheduler:_work] strategy_work_error, run_now={run_now}, id {strategy.strategy_id}, name {strategy.strategy_name},'
                         f' genre {strategy.trigger_genre}')
            return 0

Original: https://blog.csdn.net/weixin_43380311/article/details/120726441
Author: Fighting-年轻人就该张牙舞爪
Title: Python定时任务 – apscheduler

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/815723/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球