Python schedule 库定时任务

Python schedule 库定时任务

schedule的使用

用于scrapy定时任务设置
import schedule
import time

def job():
    print("Do Jod", time.time())

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).day.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

schedule在scrapy的应用

import subprocess, schedule, time, datetime, logging

from multiprocessing import Process
from scrapy import cmdline

def crawl_work():
    print("I'm working...")
    ## cmd = "scrapy crawl NanPing"
    ## subprocess.Popen(cmd)
    ## run(cmd, shell=True)
    ## pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout
    ## print(pipe.read())
    print('-'*100)
    args = ["scrapy", "crawl", 'it']
    while True:
        start = time.time()
        p = Process(target=cmdline.execute, args=(args,))
        p.start()
        p.join()
        logging.debug("### use time: %s" % (time.time() - start))

if __name__=='__main__':
    print('*'*10+'开始执行定时爬虫'+'*'*10)
    schedule.every(1).minutes.do(crawl_work)
    print('当前时间为{}'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('*' * 10 + '定时爬虫开始运行' + '*' * 10)
    while True:
        schedule.run_pending()
        time.sleep(10)

Sqlalchemy连接postgresql

import os

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

postgresql+psycopg2://username:password@host/dbname
engine = create_engine("postgresql+psycopg2://postgres:88595073@localhost/lecture3")
db = scoped_session(sessionmaker(bind=engine))

def main():
    result = db.execute("SELECT * FROM table_name").fetchall()
    for r in result:
        print(r)

if __name__ == "__main__":
    main()

Scrapy通过连接池连接mysql工具

import pymysql
import traceback
from DBUtils.PooledDB import PooledDB
from scrapy.utils.project import get_project_settings

class MySqlUtil(object):

    # 获取scrapy中settings的信息
    settings = get_project_settings()

    config = {
        "host": settings.get("MYSQL_HOST"),
        "port": settings.get("MYSQL_PORT"),
        "database": settings.get("MYSQL_DATABASE"),
        "user": settings.get("MYSQL_USER"),
        "password": settings.get("MYSQL_PASSWORD"),
        "charset": settings.get("MYSQL_CHARSET")
    }

"""
    MYSQL数据库对象,负责产生数据库连接
"""

    # 连接池对象
    __pool = None

    def __init__(self):
        self._conn = MysqlUtil.get_conn()
        self._cursor = self._conn.cursor()

    @staticmethod
    def get_conn():
"""
        @summary: 静态方法,从连接池中取出连接
        @return: MySQLdb.connection
"""
        if MysqlUtil.__pool is None:
            __pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host=MysqlUtil.config['host'], port=MysqlUtil.config['port'], user=MysqlUtil.config['user'], passwd=MysqlUtil.config['password'], db=MysqlUtil.config['database'], charset=MysqlUtil.config['charset']))
        return __pool.connection()

    def get_all(self, sql, param=None):
"""
        @summary: 执行查询, 并返回所有结果集
        @param sql: 查询sql,如果有查询条件,请指定参数列表,并使用[param]传入
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list(字典对象)/boolean 查询到的结果集
"""
        try:
            if param is None:
                count = self._cursor.execute(sql)
            else:
                count = self._cursor.execute(sql, param)
            if count > 0:
                result = self._cursor.fetchall()
            else:
                result = False
            return result
        except Exception as e:
            traceback.print_exc(e)

    def get_one(self, sql, param=None):
"""
        @summary: 执行查询, 并返回所有结果集
        @param sql: 查询sql,如果有查询条件,请指定参数列表,并使用[param]传入
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
"""
        try:
            if param is None:
                count = self._cursor.execute(sql)
            else:
                count = self._cursor.execute(sql, param)
            if count > 0:
                result = self._cursor.fetchone()
            else:
                result = False
            return result
        except Exception as e:
            traceback.print_exc(e)

    def get_count(self, sql, param=None):
"""
        @summary: 执行查询, 并返回所有结果集
        @param sql: 查询sql,如果有查询条件,请指定参数列表,并使用[param]传入
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
"""
        try:
            if param is None:
                count = self._cursor.execute(sql)
            else:
                count = self._cursor.execute(sql, param)
            return count
        except Exception as e:
            traceback.print_exc(e)

    def get_many(self, sql, num, param=None):
"""
        @summary: 执行查询,并取出num条结果
        @param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param num:取得的结果条数
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
"""
        try:
            if param is None:
                count = self._cursor.execute(sql)
            else:
                count = self._cursor.execute(sql, param)
            if count > 0:
                result = self._cursor.fetchmany(num)
            else:
                result = False
            return result
        except Exception as e:
            traceback.print_exc(e)

    def insert_one(self, sql, value):
"""
        @summary: 向数据表插入一条记录
        @param sql:要插入的sql格式
        @param value:要插入的记录数据tuple/list
        @return: insertId 受影响的行数
"""
        try:
            row_count = self._cursor.execute(sql, value)
            return row_count
        except Exception as e:
            traceback.print_exc(e)
            self.end("rollback")

    def insert_many(self, sql, values):
"""
        @summary: 向数据表插入多条记录
        @param sql:要插入的sql格式
        @param values:要插入的记录数据tuple(tuple)/list[list]
        @return: count 受影响的行数
"""
        try:
            row_count = self._cursor.executemany(sql, values)
            return row_count
        except Exception as e:
            traceback.print_exc(e)
            self.end("rollback")

    def __query(self, sql, param=None):
        try:
            if param is None:
                count = self._cursor.execute(sql)
            else:
                count = self._cursor.execute(sql, param)
            return count
        except Exception as e:
            traceback.print_exc(e)

    def update(self, sql, param=None):
"""
        @summary: 更新数据表记录
        @param sql: sql格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
"""
        return self.__query(sql, param)

    def delete(self, sql, param=None):
"""
        @summary: 删除数据表记录
        @param sql: sql格式及条件,使用(%s,%s)
        @param param: 要删除的条件 值 tuple/list
        @return: count 受影响的行数
"""
        return self.__query(sql, param)

    def begin(self):
"""
        @summary: 开启事务
"""
        self._conn.autocommit(0)

    def end(self, option='commit'):
"""
        @summary: 结束事务
"""
        if option == 'commit':
            self._conn.commit()
        else:
            self._conn.rollback()

    def dispose(self, is_end=1):
"""
        @summary: 释放连接池资源
"""
        if is_end == 1:
            self.end('commit')
        else:
            self.end('rollback')
        self._cursor.close()
        self._conn.close()
调用 pipeline
from torrentSpider.utils.db_util import MysqlUtil
import traceback
import logging

class MySqlPipeline(object):
    pool = None

    def __init__(self):
        pass

    # 开启爬虫时链接数据库
    def open_spider(self, spider):
        self.pool = MysqlUtil()

    # 处理
    def process_item(self, item, spider):
        try:
            # 执行sql语句
            # sql = "select * from torrent_ye"
            # count = self.pool.get_all(sql, None)
            # print('查询数量为:' + str(count))

            # 先去数据库查询,查到了就不入库了
            sql_select = """select count(1) from torrent_ye where torrent_url = %(torrent_url)s"""
            params_select = {'torrent_url': item['torrent_url']}
            flag = self.pool.get_count(sql_select, params_select)
            if flag > 0:
                logging.info('记录已经存在:[%s][%s]', item['torrent_title'], item['torrent_url'])
                return

            sql_insert = """insert into torrent_ye(torrent_title, torrent_name, torrent_director,
            torrent_actor, torrent_language, torrent_type, torrent_region, torrent_update_time,
            torrent_status, torrent_show_time, torrent_introduction, torrent_url) values
             (%(torrent_title)s,%(torrent_name)s,%(torrent_director)s,%(torrent_actor)s,%(torrent_language)s,
             %(torrent_type)s,%(torrent_region)s,%(torrent_update_time)s,%(torrent_status)s,%(torrent_show_time)s,%(torrent_introduction)s,%(torrent_url)s)"""

            params = {'torrent_title': item['torrent_title'], 'torrent_name': item['torrent_name'],
                      'torrent_director': item['torrent_director'], 'torrent_actor': item['torrent_actor'],
                      'torrent_language': item['torrent_language'], 'torrent_type': item['torrent_type'],
                      'torrent_region': item['torrent_region'], 'torrent_update_time': item['torrent_update_time'],
                      'torrent_status': item['torrent_status'], 'torrent_show_time': item['torrent_show_time'],
                      'torrent_introduction': item['torrent_introduction'], 'torrent_url': item['torrent_url']}

            self.pool.insert_one(sql_insert, params)
            self.pool.end("commit")
        except Exception as e:
            logging.error('发生异常:[%s]', e)
            traceback.print_exc(e)
            self.pool.end("rollback")

    # 结束
    def close_spider(self, spider):
        pass

Original: https://www.cnblogs.com/outliver/p/15504628.html
Author: Grey_xx
Title: Python schedule 库定时任务

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/606928/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 5.2 Vi和Vim之间到底有什么关系?

    我们知道,Vi 编辑器是 Unix 系统最初的编辑器。它使用控制台图形模式来模拟文本编辑窗口,允许查看文件中的行、在文件中移动、插入、编辑和替换文本。 尽管 Vi 可能是世界上复杂…

    Linux 2023年6月7日
    089
  • PHP安装和部署

    一、关闭防火墙 二、安装EPEL源、REMI源、yum源管理工具、PHP 7.3 ①安装epel源 [root@localhost yum.repos.d]# yum instal…

    Linux 2023年6月7日
    0123
  • Java对象序列化和反序列化

    Java类的序列化和反序列化 序列化:指将对象转换为字节序列的过程,也就是将对象的信息转换成文件保存。 反序列化:将字节序列转换成目标对象的过程,也就是读取文件,并转换为对象。 几…

    Linux 2023年6月14日
    0102
  • 防止shell script多次运行

    防止shell script多次运行 一个思路是在script初期检测系统中是否存在同名进程。 if [ ps -ef | grep "test.sh" | g…

    Linux 2023年5月28日
    080
  • python学习(解析python官网会议安排)

    对html的解析是网页抓取的基础,分析抓取的结果找到自己想要的内容或标签以达到抓取的目的。 HTMLParser是python用来解析html的模块。它可以分析出html里面的标签…

    Linux 2023年6月14日
    0105
  • Shell脚本完成IOS平台下的多目录和多架构编译(调用Makefile一起完成)

    博客园 :当前访问的博文已被密码保护 请输入阅读密码: Original: https://www.cnblogs.com/cy568searchx/p/5735429.htmlA…

    Linux 2023年5月28日
    0112
  • jmeter接口关联-后置处理器(正则表达式)

    接口测试通过会涉及到两个接口之间的关联,前一个接口的返回数据作为到下一个接口的入参,这时候就需要用到后置处理器,其中有正则表达式提取器、XPath提取器、JSON提取器,今天先示范…

    Linux 2023年6月8日
    083
  • Linux查看服务器内存、磁盘、cpu、网络占用、端口占用情况

    1、查看物理CPU个数:cat cat /proc/cpuinfo | grep “physical id” | sort | uniq | wc -l2、…

    Linux 2023年6月13日
    0131
  • yum源安装nginx

    nginx使用yum源安装 安装步骤 使用yum源安装依赖 yum install yum-utils 配置nginx.repo的yum文件 vim /etc/yum.repos….

    Linux 2023年6月8日
    0113
  • VMware服务关闭后一定要重启

    重要的事情说三遍:服务暂时关闭记得重启,服务暂时关闭记得重启,服务暂时关闭记得重启!!! VMware服务由于安装补丁的需要我暂时把服务关闭了,于是我遇到了尴尬的一幕,于是乎发现上…

    Linux 2023年6月7日
    0119
  • 【EventOS Nano】EventOS Nano初步了解

    EventOS Nano是什么? EventOS Nano是一个面向单片机、事件驱动的、分布式的、可跨平台开发的嵌入式开发平台。主要有两大技术特色: 事件驱动和 超轻量 Event…

    Linux 2023年6月13日
    085
  • freePBR的UE4材质合集

    我手动下载了freepbr.com上的所有ue4材质,放到百度云上分享给大家。 freePBR的UE4材质合集 想开个新坑了。但工欲善其事必先利其器。于是我手动下载了freepbr…

    Linux 2023年6月6日
    094
  • Teleport&Suspense

    vue3 新添加了一个默认的组件就叫 Teleport,我们可以拿过来直接使用,它上面有一个 to 的属性,它接受一个css query selector 作为参数,这就是代表要把…

    Linux 2023年6月13日
    0104
  • 【MQTT】阿里云搭建MQTT物联网平台通信

    MQTT环境搭建和测试 物联网环境搭建 MQTT.fx使用 物联网环境搭建 1.首先进入阿里云官网注册并登录你的账号。2.点击控制台。3.在产品与服务下面搜索物联网平台4.点击公共…

    Linux 2023年6月13日
    083
  • 从前端走向后端

    每次过年回老家聚会,遇到不熟悉的亲戚朋友,经常被问到职业是什么。一开始,我总是很认真的回答这个问题,结果常常引出一番尴尬的问答。 “你&…

    Linux 2023年6月6日
    097
  • 每天一个 HTTP 状态码 103

    103 Earyly Hints 是被用于在最终的 HTTP 消息前返回一些响应头… 103 Early Hints 103 Earyly Hints 是被用于在最终 …

    Linux 2023年6月7日
    0111
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球