- 下面的代码构建了一个调度器, 实现了任务调度,任务删除,任务暂停,任务管理等功能
import datetime
import json
import os
import threading
import traceback
import logging
import apscheduler.jobstores.base
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_SUBMITTED, EVENT_JOB_REMOVED
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.util import undefined
from db.sqlite.base import BaseDb
from common.base_model import Strategy, Trigger
import common
from utils import parse_argv
logger = logging.getLogger("HomedTools")
class Scheduler(BaseDb):
__table_name__ = "scheduler"
STATE_OPEN = 1
STATE_OFF = 0
def __init__(self):
super().__init__()
self.ThreadPool = ThreadPoolExecutor(20)
self.ProcessPool = ProcessPoolExecutor(10)
self.table_name = self.create_table(self.__table_name__, self._create_sql)
self.scheduler = BackgroundScheduler(executors={
'default': self.ThreadPool,
})
self.trigger_map = {
"cron": CronTrigger,
"interval": IntervalTrigger,
"date": DateTrigger}
self.strategy = dict()
self._load_strategy()
self.strategy_cursor = max(self.strategy) if self.strategy else 1001
self._start()
self.threadingLock = threading.Lock()
def feature(self):
pass
@property
def strategy_max(self):
with common.Lock:
self.strategy_cursor += 1
return self.strategy_cursor
@property
def _create_sql(self):
return ("CREATE TABLE {}
"
"("
"f_strategy_id INT(11) NOT NULL DEFAULT 1101, "
"f_strategy_name VARCHAR(256) NOT NULL DEFAULT '', "
"f_status INT(6) NOT NULL DEFAULT 1, "
"f_model_name VARCHAR(256) NOT NULL DEFAULT '', "
"f_trigger TEXT NOT NULL DEFAULT '', "
"f_genre VARCHAR(66) NOT NULL, "
"f_run_now INT(6) NOT NULL DEFAULT 0, "
"f_args TEXT NOT NULL DEFAULT '', "
"f_create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
"PRIMARY KEY (f_strategy_id)"
"); ")
def _load_strategy(self):
sql = f"SELECT f_strategy_id, f_strategy_name, f_model_name, f_trigger, " \
f"f_genre, f_status, f_run_now, f_args FROM {self.table_name}
"
for each in self.execute(sql=sql):
strategy_id = each[0]
strategy_name = each[1]
model_name = each[2]
trigger = json.loads(each[3])
trigger["genre"] = each[4]
status = each[-3]
run_now = each[-2]
args = each[-1]
if model_name in common.modules.models:
strategy_obj = Strategy(
model=common.modules.models[model_name],
trigger=trigger,
strategy_id=strategy_id,
strategy_name=strategy_name,
status=status,
args=args
)
else:
strategy_obj = Strategy(
model=None,
trigger=trigger,
strategy_id=strategy_id,
strategy_name=strategy_name,
status=status,
args=args
)
logger.warning(f"[Scheduler:_load_strategy] {model_name} is not exist!!!")
self._work(strategy_obj, run_now=run_now, is_load=1)
logger.info(f"[Scheduler:_load_strategy] _load_strategy_end "
f"all_strategy_ram={self.strategy}, "
f"jobs={self.scheduler.get_jobs()}")
def add(self, *args):
logger.info(f"[Scheduler:add] 添加策略 args={args}")
strategy_id, strategy_name, model_name, run_now, trigger, status, params = args
strategy_obj = Strategy(
model=common.modules.models[model_name],
trigger=trigger,
strategy_id=strategy_id,
strategy_name=strategy_name,
status=status,
args=params
)
return self._work(strategy_obj, run_now=run_now)
def update(self, strategy_id, trigger, args):
return self.modify_strategy(strategy_id, trigger, args)
def no_repeat(self, model_name):
for k, v in self.strategy.items():
if v.model.model_name == model_name:
return v.strategy_id
@staticmethod
def job_id(strategy_id, pre="strategy"):
return f"{pre}_{strategy_id}"
@property
def all_jobs(self):
return [i.id for i in self.scheduler.get_jobs()]
def modify_strategy(self, strategy_id, trigger, args):
try:
if strategy_id in self.strategy:
strategy = self.strategy[strategy_id]
logger.info(f"[Scheduler:modify_strategy] strategy_trigger={strategy.trigger_fields}")
job_id = self.job_id(strategy_id)
trigger_base = Trigger(trigger=trigger)
genre, trigger = trigger_base.trigger_genre, trigger_base.trigger_fields
trigger_obj = self.trigger_map[genre](**trigger)
with self.threadingLock:
logger.info(f"[Scheduler:modify_strategy] scheduler_get_jobs={self.scheduler.get_jobs()}")
try:
self.scheduler.pause_job(job_id)
except apscheduler.jobstores.base.JobLookupError:
logger.info(f"[Scheduler:modify_strategy] no_have_job {job_id}, new one")
if strategy.trigger_genre != "date":
res = self.scheduler.add_job(func=strategy.model.job,
trigger=trigger_obj,
id=job_id,
args=args if args else None,
kwargs={"debug": 0},
next_run_time=undefined)
else:
res = self.scheduler.add_job(func=strategy.model.job,
trigger=trigger_obj,
id=job_id,
args=args if args else None,
kwargs={"debug": 0})
logger.info(f"[Scheduler:modify_strategy] strategy_work_success, "
f"res={res}, call={strategy.model.job}"
f" jobs={[i.func_ref for i in self.scheduler.get_jobs()]}")
else:
self.strategy[strategy_id].status = self.STATE_OFF
self.scheduler.modify_job(job_id=job_id, trigger=trigger_obj, args=args)
self.scheduler.resume_job(job_id)
self.strategy[strategy_id].status = self.STATE_OPEN
return 1
else:
return 0
except Exception:
logger.error(f"[Scheduler:modify_strategy] \n {traceback.format_exc()}")
return -1
def pause(self, strategy_id=None):
if strategy_id is None:
pause_flag = self.scheduler.pause()
else:
pause_flag = self.scheduler.pause_job(self.job_id(strategy_id))
dump_flag = self._set_status(strategy_id=strategy_id, status=0)
if all((pause_flag, dump_flag)):
return 1
else:
return 0
def _set_status(self, strategy_id=None, status=None):
sql = f"UPDATE {self.table_name}
SET f_status={status}"
if strategy_id:
self.strategy[strategy_id].status = status
dump_flag = self.execute(sql=sql+f" WHERE f_strategy_id={strategy_id}")
else:
for strategy_id, strategy_obj in self.strategy.items():
strategy_obj.status = status
dump_flag = self.execute(sql=sql)
return dump_flag
def resume(self, strategy_id=None):
if strategy_id is None:
pause_flag = self.scheduler.resume()
else:
pause_flag = self.scheduler.resume_job(job_id=self.job_id(strategy_id))
dump_flag = self._set_status(strategy_id=strategy_id, status=1)
if all((pause_flag, dump_flag)):
return 1
else:
return 0
def remove(self, strategy_id):
if strategy_id is None:
self.scheduler.remove_all_jobs()
else:
if strategy_id not in self.strategy:
logger.warning(f"[Scheduler:remove] strategy_no_exist {strategy_id}")
return 0
try:
job_id = self.job_id(strategy_id)
self.scheduler.remove_job(job_id=job_id)
except apscheduler.jobstores.base.JobLookupError:
logger.warning(f"[Scheduler:remove] strategy_no_in_job {strategy_id}")
self.strategy.pop(strategy_id)
return 1
def event_driven(self):
def job_error(event):
logger.error(f"[Scheduler:event_driven:job_error] job {event.job_id}, execute error \n {event.traceback} "
f"\n\n {event.exception}")
def job_start(event):
logger.info(f"[Scheduler:event_driven:job_start] job {event.job_id}")
def job_end(event):
logger.info(f"[Scheduler:event_driven:job_end] job {event.job_id}")
def job_delete(event):
logger.info(f"[Scheduler:event_driven:job_delete] job {event.job_id}")
self.scheduler.add_listener(job_start, EVENT_JOB_SUBMITTED)
self.scheduler.add_listener(job_error, EVENT_JOB_ERROR)
self.scheduler.add_listener(job_end, EVENT_JOB_EXECUTED)
self.scheduler.add_listener(job_delete, EVENT_JOB_REMOVED)
def _start(self):
self.event_driven()
self.scheduler.start()
logger.info('[[Scheduler:_start] 策略启动')
def _work(self, strategy: Strategy, run_now, is_load=0):
if strategy.model is None:
self.strategy[strategy.strategy_id] = strategy
return 1
if strategy.status == self.STATE_OFF:
logger.warning(f"[Scheduler:_work] strategy_is_off {strategy.strategy_name}")
self.strategy[strategy.strategy_id] = strategy
return 1
if strategy.trigger_genre is not None:
if is_load and strategy.trigger_genre == "date":
logger.warning(f"[Scheduler:_work] run_once_no_load {strategy.strategy_name} ")
self.strategy[strategy.strategy_id] = strategy
return 1
trigger = self.trigger_map[strategy.trigger_genre](
**strategy.trigger_fields)
args = strategy.args
logger.info(f"[Scheduler:_work] "
f"strategy_parse_success, id {strategy.strategy_id},"
f" name {strategy.strategy_name},"
f" genre {strategy.trigger_genre}"
f" fields_value {strategy.trigger_fields}"
f" args={args}"
)
if strategy.trigger_genre != "date":
res = self.scheduler.add_job(func=strategy.model.job,
trigger=trigger,
id=self.job_id(strategy.strategy_id),
args=args if args else None,
kwargs={"debug": 0},
next_run_time=datetime.datetime.now() if run_now else undefined)
else:
res = self.scheduler.add_job(func=strategy.model.job,
trigger=trigger,
id=self.job_id(strategy.strategy_id),
args=args if args else None,
kwargs={"debug": 0}
)
logger.info(f"[Scheduler:_work] strategy_work_success, run_now={run_now} res={res}, call={strategy.model.job}"
f" jobs={[i.func_ref for i in self.scheduler.get_jobs()]}")
self.strategy[strategy.strategy_id] = strategy
return 1
else:
logger.error(f'[Scheduler:_work] strategy_work_error, run_now={run_now}, id {strategy.strategy_id}, name {strategy.strategy_name},'
f' genre {strategy.trigger_genre}')
return 0
Original: https://blog.csdn.net/weixin_43380311/article/details/120726441
Author: Fighting-年轻人就该张牙舞爪
Title: Python定时任务 – apscheduler
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/815723/
转载文章受原作者版权保护。转载请注明原作者出处!