scrapy爬虫常用代码,各种技巧

在要创建项目的文件夹中,按住”shift+鼠标右键”,点击”在此处打开Powershell窗口”,在窗口中输入以下命令

scrapy startproject 项目名

在用pycharm打开的项目的命令行窗口中输入

scrapy genspider spider的名称 www.xxx.com
USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 ' \
             r'Safari/537.36 '
设置日志显示的级别
LOG_LEVEL = "ERROR"
Obey robots.txt rules
ROBOTSTXT_OBEY = False
Enable and configure the AutoThrottle extension (disabled by default)
See https://docs.scrapy.org/en/latest/topics/autothrottle.html
AUTOTHROTTLE_ENABLED = True
The initial download delay
AUTOTHROTTLE_START_DELAY = 5.0
The maximum download delay to be set in case of high latencies
AUTOTHROTTLE_MAX_DELAY = 60.0
The average number of requests Scrapy should be sending in parallel to
each remote server
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
Enable showing throttling stats for every response received:
AUTOTHROTTLE_DEBUG = False
DOWNLOAD_WARNSIZE = 0
DOWNLOAD_TIMEOUT = 60*60
meta = {
            'download_maxsize': 0,
            'download_timeout': 60 * 60,
       }
yield Request(url=item['file_urls'], meta=meta, headers=UnitSpider.videoHeader)
from dmd.spiders.unit import UnitSpider

from scrapy.pipelines.images import FilesPipeline
from scrapy import Request

from dmd.spiders.unit2 import UnitSpider2

class BigfilePipeline(FilesPipeline):

    def get_media_requests(self, item, info):
        meta = {
            'filename': item['filename'],
            'download_maxsize': 0,
            'download_timeout': 60 * 60,
        }
        yield Request(url=item['file_urls'], meta=meta)

    def file_path(self, request, response=None, info=None, *, item=None):
        return request.meta['filename']

    def item_completed(self, results, item, info):
        if not results[0]:
            with open(r'error.txt', 'w+', encoding='utf-8') as fp:
                fp.write('错误url地址:' + results[1].get('url', '') + '\t')
                fp.write('错误码:' + results[1]['status'] + '\n')
        return item
import os
from scrapy import cmdline

def main2():
    cmdline.execute('scrapy crawl unit2'.split())

if __name__ == '__main__':
    main()
scrapy crawl unit2
import os
from scrapy.crawler import CrawlerProcess
from dmd.spiders.unit2 import UnitSpider2
from scrapy.utils.project import get_project_settings

def main():
    settingObject = get_project_settings()

    settingObject.set('LOG_LEVEL', 'ERROR')
    settingObject.set('LOG_FILE', errorFile)
    settingObject.set('ITEM_PIPELINES', {
        'dmd.pipelines.BigfilePipeline': 200,
    })
    settingObject.set('FILES_STORE', saveSuperPath)
    crawlerProcess = CrawlerProcess(settings=settingObject)

    UnitSpider2.start_urls = ['网站地址']

    UnitSpider2.savePath = saveSuperPath

    UnitSpider2.startIndex = 0

    UnitSpider2.endIndex = 14
    crawlerProcess.crawl(UnitSpider2)
    crawlerProcess.start()
import sqlite3

class SqliteUtils:
"""
    sqlite数据库操作工具类
    database: 数据库文件地址,例如:db/mydb.db
"""
    _connection = None

    def __init__(self, database):

        self._connection = sqlite3.connect(database)

    def _dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d

    def execute(self, sql, args=[], result_dict=True, commit=True) -> list:
"""
        执行数据库操作的通用方法
        Args:
        sql: sql语句
        args: sql参数
        result_dict: 操作结果是否用dict格式返回
        commit: 是否提交事务
        Returns:
        list 列表,例如:
        [{'id': 1, 'name': '张三'}, {'id': 2, 'name': '李四'}]
"""
        if result_dict:
            self._connection.row_factory = self._dict_factory
        else:
            self._connection.row_factory = None

        _cursor = self._connection.cursor()

        _cursor.execute(sql, args)
        if commit:
            self._connection.commit()
        data = _cursor.fetchall()
        _cursor.close()
        return data

    def commit(self):
        self._connection.commit()

    def close(self):
        self._connection.close()

if __name__ == '__main__':
    db = SqliteUtils('browser.db')

    print(db.execute("select rowid,id, name userName, password pwd from user"))
    print(db.execute("select * from user", result_dict=False))
    print(db.execute("select * from user"))


import asyncio
import os
import re
import subprocess
import time

class MergeTsFiles:
    def __init__(self, path, recurs=False, completeFilename='complete', m3u8Filename=None, saveFilename='result.mp4',
                 ffmpegPath=r"ffmpeg-4.3.2-2021-02-27-full_build\bin\ffmpeg.exe"):

        self.path = path
        self.recurs = recurs
        self.completeFilename = completeFilename + r'.txt'
        self.m3u8Filename = m3u8Filename
        self.saveFilename = saveFilename
        self.ffmpegPath = ffmpegPath

    def __merge3(self, path, isForce):
        m3u8file = ''
        tsFileList = []
        reference = {}
        isComplete = False
        for dir in os.listdir(path):
            dir = os.path.join(path, dir)
            if os.path.isdir(dir):
                self.__merge3(dir, isForce)
            elif os.path.isfile(dir):
                if m3u8file == '' and os.path.splitext(dir)[1] == '.m3u8':
                    if not self.m3u8Filename:
                        m3u8file = dir
                    elif dir == self.m3u8Filename:
                        m3u8file = dir
                elif os.path.splitext(dir)[1] == '.ts':
                    tsFileList.append(dir)
                elif dir.split('\\')[-1] == self.completeFilename:
                    isComplete = True
        if m3u8file == '' or len(tsFileList) == 0:
            return
        if not isForce and isComplete:
            return
        with open(m3u8file, 'r', encoding='utf-8') as fp:
            cnt = 1
            while lineStr := fp.readline():
                lineStr = lineStr.strip()
                if lineStr[0] == '#':
                    continue
                reference[re.findall(r'([^/]+.ts)$', lineStr)[0]] = cnt
                cnt += 1
        tsFileList = sorted(tsFileList, key=lambda x: reference[x.split('\\')[-1]])
        mp4FileCnt = 0
        for i in range(0, len(tsFileList), 100):
            if i + 100  len(tsFileList):
                j = i + 100
            else:
                j = len(tsFileList)
            tmpFile = os.path.join(path, str(time.time()) + '.txt')
            with open(tmpFile, 'w', encoding='utf-8') as fp:
                for i in tsFileList[i:j]:
                    fp.writelines("file '{0}'\n".format(i))

            cmdStr = self.ffmpegPath + r' -f concat -safe 0 -i {0} -c copy tmp.{1}.mp4'.format(tmpFile, mp4FileCnt)
            cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
            popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE)
            popen.wait()
            os.remove(tmpFile)
            mp4FileCnt += 1
        while mp4FileCnt != 1:
            newMp4FileCnt = 0
            for i in range(0, mp4FileCnt, 10):
                j = i + 10
                if j > mp4FileCnt:
                    j = mp4FileCnt
                tmpFile = os.path.join(path, str(time.time()) + '.txt')
                deleteFilesList = []
                with open(tmpFile, 'w', encoding='utf-8') as fp:
                    for k in range(i, j):

                        cmdStr = self.ffmpegPath + r' -i tmp.{0}.mp4 -vcodec copy -acodec copy -vbsf h264_mp4toannexb ' \
                                                   r'tmp.{0}.ts'.format(k)
                        cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
                        popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE)
                        popen.wait()
                        os.remove(path + '\\' + 'tmp.{0}.mp4'.format(k))
                        fp.writelines("file 'tmp.{0}.ts'\n".format(k))
                        deletePathStr = path + '\\' + 'tmp.{0}.ts'.format(k)
                        deleteFilesList.append(deletePathStr)

                cmdStr = self.ffmpegPath + r' -f concat -safe 0 -i {0} -c copy tmp.{1}.mp4'.format(tmpFile,
                                                                                                   newMp4FileCnt)
                cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
                popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE)
                popen.wait()
                os.remove(tmpFile)

                for k in deleteFilesList:
                    os.remove(k)
                newMp4FileCnt += 1
            mp4FileCnt = newMp4FileCnt
        os.rename(path + '\\' + 'tmp.0.mp4', path + '\\' + self.saveFilename)
        self.__complete(path)

    def __complete(self, path):
        print(r'合并完成:{0}'.format(path))
        with open(os.path.join(path, self.completeFilename), 'w', encoding='utf-8') as fp:
            fp.write(r'该文件夹的ts文件已经合并完成')

    def merge(self, isForce=False):
        self.__merge3(self.path, isForce)

if __name__ == '__main__':
    object = MergeTsFiles(path=r'JOJO的奇妙冒险第三部')
    object.merge()


import os
import subprocess

class ZipFiles:
    def __init__(self, path, rarExePath=r'Rar.exe') -> None:

        self.zipPath = path

        self.rarExePath = rarExePath

        if not os.path.exists(self.zipPath):
            os.makedirs(self.zipPath)

    def rarError(self, message):
        with open(os.path.join(self.zipPath, r'error.log'), 'w+', encoding='utf-8') as fp:
            fp.write(message)

    def rar(self, unzip_path, password, single_volume=1024 ** 3, max_size=3 * 1024 ** 3, all_flag=True):
        cmdStr = ''
        if all_flag:
            for i in os.listdir(unzip_path):
                path = os.path.join(unzip_path, i)
                savePath = '"' + os.path.splitext(i)[0] + r'".rar'
                if os.path.getsize(path) > max_size:
                    if not os.path.exists(os.path.join(self.zipPath, os.path.splitext(i)[0])):
                        os.makedirs(os.path.join(self.zipPath, os.path.splitext(i)[0]))
                    savePath = '"' + os.path.splitext(i)[0] + '"\\"' + os.path.splitext(i)[0] + r'".rar'
                    cmdStr = self.rarExePath + r' a -hp{0} -ep -v{3}b {1} "{2}"'.format(password, savePath, path,
                                                                                        single_volume)
                else:
                    savePath = '"' + os.path.splitext(i)[0] + r'".rar'
                    cmdStr = self.rarExePath + r' a -hp{0} -ep {1} "{2}"'.format(password, savePath, path)
                cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
                popen = subprocess.Popen(cmdStr, shell=True, cwd=self.zipPath, stderr=subprocess.PIPE,
                                         stdout=subprocess.PIPE)
                out, err = popen.communicate()

                if err != b'':
                    self.rarError(str(err, encoding='gbk'))
        else:
            if os.path.getsize(unzip_path) > max_size:
                zip_path = os.path.join(self.zipPath, os.path.splitext(unzip_path)[0].split('\\')[-1])
                if not os.path.exists(zip_path):
                    os.makedirs(zip_path)
                savePath = '"' + os.path.splitext(unzip_path)[0].split('\\')[-1] + '"\\"' + \
                           os.path.splitext(unzip_path)[0].split('\\')[-1] + r'".rar'
                cmdStr = self.rarExePath + r' a -hp{0} -ep -v{3}b {1} "{2}"'.format(password, savePath, unzip_path,
                                                                                    single_volume)
            else:
                savePath = '"' + os.path.splitext(unzip_path)[0].split('\\')[-1] + r'".rar'
                cmdStr = self.rarExePath + r' a -hp{0} -ep {1} "{2}"'.format(password, savePath, unzip_path)
            cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
            popen = subprocess.Popen(cmdStr, shell=True, cwd=self.zipPath, stderr=subprocess.PIPE,
                                     stdout=subprocess.PIPE)
            out, err = popen.communicate()

            if err != b'':
                self.rarError(str(err, encoding='gbk'))

if __name__ == '__main__':

    zipUtils = ZipFiles(r'压缩版')
    zipUtils.rar(r"文件.mp4", '123', single_volume=1024 ** 3,
                 max_size=2 * 1024 ** 3, all_flag=False)

在spider类的初始化方法中加入以下代码,有些细节需要自己修改

def __init__(self, name=None, **kwargs):
    super().__init__(name, **kwargs)
    self.db = SqliteUtils(Unit1Spider.savePath + r'\record.db')
    createTableSql = '''CREATE TABLE IF NOT EXISTS record(
               url VARCHAR
        );'''
    self.db.execute(createTableSql, commit=True)

判断是否重复抓取的代码

result = self.db.execute('SELECT rowid,url from record where url=?', args=[videoUrl])
if len(result) == 0:

记录抓取完成的代码

spider.db.execute(r'INSERT INTO record (url) VALUES (?)', args=[item['url']], commit=True)

在spider类的结束方法中,关闭数据库

def closed(self, reason):
    self.db.close()

在spider类的初始化方法中加入以下代码,有些细节需要自己修改

def __init__(self, name=None, **kwargs):
    super().__init__(name, **kwargs)

    chrome_options = Options()

    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--disable-gpu')

    chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])

    path = r'chromedriver.exe'

    self.browser = webdriver.Chrome(executable_path=path, options=chrome_options)

selenium使用例子


spider.browser.get(response.url)

div = self.browser.find_element_by_xpath(r'//div')

div.click()

userInput = self.browser.find_elements_by_xpath(r'//div//input[1]')[0]

userInput.send_keys("1234")

self.browser.switch_to.window(self.browser.window_handles[-1])

在spider类的结束方法中,关闭selenium

def closed(self, reason):
    self.browser.quit()

Original: https://blog.csdn.net/m0_46200304/article/details/122022285
Author: python苦命人
Title: scrapy爬虫常用代码,各种技巧

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/816252/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球