在要创建项目的文件夹中,按住”shift+鼠标右键”,点击”在此处打开Powershell窗口”,在窗口中输入以下命令
scrapy startproject 项目名
在用pycharm打开的项目的命令行窗口中输入
scrapy genspider spider的名称 www.xxx.com
USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 ' \
r'Safari/537.36 '
设置日志显示的级别
LOG_LEVEL = "ERROR"
Obey robots.txt rules
ROBOTSTXT_OBEY = False
Enable and configure the AutoThrottle extension (disabled by default)
See https://docs.scrapy.org/en/latest/topics/autothrottle.html
AUTOTHROTTLE_ENABLED = True
The initial download delay
AUTOTHROTTLE_START_DELAY = 5.0
The maximum download delay to be set in case of high latencies
AUTOTHROTTLE_MAX_DELAY = 60.0
The average number of requests Scrapy should be sending in parallel to
each remote server
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
Enable showing throttling stats for every response received:
AUTOTHROTTLE_DEBUG = False
DOWNLOAD_WARNSIZE = 0
DOWNLOAD_TIMEOUT = 60*60
meta = {
'download_maxsize': 0,
'download_timeout': 60 * 60,
}
yield Request(url=item['file_urls'], meta=meta, headers=UnitSpider.videoHeader)
from dmd.spiders.unit import UnitSpider
from scrapy.pipelines.images import FilesPipeline
from scrapy import Request
from dmd.spiders.unit2 import UnitSpider2
class BigfilePipeline(FilesPipeline):
def get_media_requests(self, item, info):
meta = {
'filename': item['filename'],
'download_maxsize': 0,
'download_timeout': 60 * 60,
}
yield Request(url=item['file_urls'], meta=meta)
def file_path(self, request, response=None, info=None, *, item=None):
return request.meta['filename']
def item_completed(self, results, item, info):
if not results[0]:
with open(r'error.txt', 'w+', encoding='utf-8') as fp:
fp.write('错误url地址:' + results[1].get('url', '') + '\t')
fp.write('错误码:' + results[1]['status'] + '\n')
return item
import os
from scrapy import cmdline
def main2():
cmdline.execute('scrapy crawl unit2'.split())
if __name__ == '__main__':
main()
scrapy crawl unit2
import os
from scrapy.crawler import CrawlerProcess
from dmd.spiders.unit2 import UnitSpider2
from scrapy.utils.project import get_project_settings
def main():
settingObject = get_project_settings()
settingObject.set('LOG_LEVEL', 'ERROR')
settingObject.set('LOG_FILE', errorFile)
settingObject.set('ITEM_PIPELINES', {
'dmd.pipelines.BigfilePipeline': 200,
})
settingObject.set('FILES_STORE', saveSuperPath)
crawlerProcess = CrawlerProcess(settings=settingObject)
UnitSpider2.start_urls = ['网站地址']
UnitSpider2.savePath = saveSuperPath
UnitSpider2.startIndex = 0
UnitSpider2.endIndex = 14
crawlerProcess.crawl(UnitSpider2)
crawlerProcess.start()
import sqlite3
class SqliteUtils:
"""
sqlite数据库操作工具类
database: 数据库文件地址,例如:db/mydb.db
"""
_connection = None
def __init__(self, database):
self._connection = sqlite3.connect(database)
def _dict_factory(self, cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def execute(self, sql, args=[], result_dict=True, commit=True) -> list:
"""
执行数据库操作的通用方法
Args:
sql: sql语句
args: sql参数
result_dict: 操作结果是否用dict格式返回
commit: 是否提交事务
Returns:
list 列表,例如:
[{'id': 1, 'name': '张三'}, {'id': 2, 'name': '李四'}]
"""
if result_dict:
self._connection.row_factory = self._dict_factory
else:
self._connection.row_factory = None
_cursor = self._connection.cursor()
_cursor.execute(sql, args)
if commit:
self._connection.commit()
data = _cursor.fetchall()
_cursor.close()
return data
def commit(self):
self._connection.commit()
def close(self):
self._connection.close()
if __name__ == '__main__':
db = SqliteUtils('browser.db')
print(db.execute("select rowid,id, name userName, password pwd from user"))
print(db.execute("select * from user", result_dict=False))
print(db.execute("select * from user"))
import asyncio
import os
import re
import subprocess
import time
class MergeTsFiles:
def __init__(self, path, recurs=False, completeFilename='complete', m3u8Filename=None, saveFilename='result.mp4',
ffmpegPath=r"ffmpeg-4.3.2-2021-02-27-full_build\bin\ffmpeg.exe"):
self.path = path
self.recurs = recurs
self.completeFilename = completeFilename + r'.txt'
self.m3u8Filename = m3u8Filename
self.saveFilename = saveFilename
self.ffmpegPath = ffmpegPath
def __merge3(self, path, isForce):
m3u8file = ''
tsFileList = []
reference = {}
isComplete = False
for dir in os.listdir(path):
dir = os.path.join(path, dir)
if os.path.isdir(dir):
self.__merge3(dir, isForce)
elif os.path.isfile(dir):
if m3u8file == '' and os.path.splitext(dir)[1] == '.m3u8':
if not self.m3u8Filename:
m3u8file = dir
elif dir == self.m3u8Filename:
m3u8file = dir
elif os.path.splitext(dir)[1] == '.ts':
tsFileList.append(dir)
elif dir.split('\\')[-1] == self.completeFilename:
isComplete = True
if m3u8file == '' or len(tsFileList) == 0:
return
if not isForce and isComplete:
return
with open(m3u8file, 'r', encoding='utf-8') as fp:
cnt = 1
while lineStr := fp.readline():
lineStr = lineStr.strip()
if lineStr[0] == '#':
continue
reference[re.findall(r'([^/]+.ts)$', lineStr)[0]] = cnt
cnt += 1
tsFileList = sorted(tsFileList, key=lambda x: reference[x.split('\\')[-1]])
mp4FileCnt = 0
for i in range(0, len(tsFileList), 100):
if i + 100 len(tsFileList):
j = i + 100
else:
j = len(tsFileList)
tmpFile = os.path.join(path, str(time.time()) + '.txt')
with open(tmpFile, 'w', encoding='utf-8') as fp:
for i in tsFileList[i:j]:
fp.writelines("file '{0}'\n".format(i))
cmdStr = self.ffmpegPath + r' -f concat -safe 0 -i {0} -c copy tmp.{1}.mp4'.format(tmpFile, mp4FileCnt)
cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE)
popen.wait()
os.remove(tmpFile)
mp4FileCnt += 1
while mp4FileCnt != 1:
newMp4FileCnt = 0
for i in range(0, mp4FileCnt, 10):
j = i + 10
if j > mp4FileCnt:
j = mp4FileCnt
tmpFile = os.path.join(path, str(time.time()) + '.txt')
deleteFilesList = []
with open(tmpFile, 'w', encoding='utf-8') as fp:
for k in range(i, j):
cmdStr = self.ffmpegPath + r' -i tmp.{0}.mp4 -vcodec copy -acodec copy -vbsf h264_mp4toannexb ' \
r'tmp.{0}.ts'.format(k)
cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE)
popen.wait()
os.remove(path + '\\' + 'tmp.{0}.mp4'.format(k))
fp.writelines("file 'tmp.{0}.ts'\n".format(k))
deletePathStr = path + '\\' + 'tmp.{0}.ts'.format(k)
deleteFilesList.append(deletePathStr)
cmdStr = self.ffmpegPath + r' -f concat -safe 0 -i {0} -c copy tmp.{1}.mp4'.format(tmpFile,
newMp4FileCnt)
cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE)
popen.wait()
os.remove(tmpFile)
for k in deleteFilesList:
os.remove(k)
newMp4FileCnt += 1
mp4FileCnt = newMp4FileCnt
os.rename(path + '\\' + 'tmp.0.mp4', path + '\\' + self.saveFilename)
self.__complete(path)
def __complete(self, path):
print(r'合并完成:{0}'.format(path))
with open(os.path.join(path, self.completeFilename), 'w', encoding='utf-8') as fp:
fp.write(r'该文件夹的ts文件已经合并完成')
def merge(self, isForce=False):
self.__merge3(self.path, isForce)
if __name__ == '__main__':
object = MergeTsFiles(path=r'JOJO的奇妙冒险第三部')
object.merge()
import os
import subprocess
class ZipFiles:
def __init__(self, path, rarExePath=r'Rar.exe') -> None:
self.zipPath = path
self.rarExePath = rarExePath
if not os.path.exists(self.zipPath):
os.makedirs(self.zipPath)
def rarError(self, message):
with open(os.path.join(self.zipPath, r'error.log'), 'w+', encoding='utf-8') as fp:
fp.write(message)
def rar(self, unzip_path, password, single_volume=1024 ** 3, max_size=3 * 1024 ** 3, all_flag=True):
cmdStr = ''
if all_flag:
for i in os.listdir(unzip_path):
path = os.path.join(unzip_path, i)
savePath = '"' + os.path.splitext(i)[0] + r'".rar'
if os.path.getsize(path) > max_size:
if not os.path.exists(os.path.join(self.zipPath, os.path.splitext(i)[0])):
os.makedirs(os.path.join(self.zipPath, os.path.splitext(i)[0]))
savePath = '"' + os.path.splitext(i)[0] + '"\\"' + os.path.splitext(i)[0] + r'".rar'
cmdStr = self.rarExePath + r' a -hp{0} -ep -v{3}b {1} "{2}"'.format(password, savePath, path,
single_volume)
else:
savePath = '"' + os.path.splitext(i)[0] + r'".rar'
cmdStr = self.rarExePath + r' a -hp{0} -ep {1} "{2}"'.format(password, savePath, path)
cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
popen = subprocess.Popen(cmdStr, shell=True, cwd=self.zipPath, stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
out, err = popen.communicate()
if err != b'':
self.rarError(str(err, encoding='gbk'))
else:
if os.path.getsize(unzip_path) > max_size:
zip_path = os.path.join(self.zipPath, os.path.splitext(unzip_path)[0].split('\\')[-1])
if not os.path.exists(zip_path):
os.makedirs(zip_path)
savePath = '"' + os.path.splitext(unzip_path)[0].split('\\')[-1] + '"\\"' + \
os.path.splitext(unzip_path)[0].split('\\')[-1] + r'".rar'
cmdStr = self.rarExePath + r' a -hp{0} -ep -v{3}b {1} "{2}"'.format(password, savePath, unzip_path,
single_volume)
else:
savePath = '"' + os.path.splitext(unzip_path)[0].split('\\')[-1] + r'".rar'
cmdStr = self.rarExePath + r' a -hp{0} -ep {1} "{2}"'.format(password, savePath, unzip_path)
cmdStr = str(cmdStr.encode('gbk'), encoding='gbk')
popen = subprocess.Popen(cmdStr, shell=True, cwd=self.zipPath, stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
out, err = popen.communicate()
if err != b'':
self.rarError(str(err, encoding='gbk'))
if __name__ == '__main__':
zipUtils = ZipFiles(r'压缩版')
zipUtils.rar(r"文件.mp4", '123', single_volume=1024 ** 3,
max_size=2 * 1024 ** 3, all_flag=False)
在spider类的初始化方法中加入以下代码,有些细节需要自己修改
def __init__(self, name=None, **kwargs):
super().__init__(name, **kwargs)
self.db = SqliteUtils(Unit1Spider.savePath + r'\record.db')
createTableSql = '''CREATE TABLE IF NOT EXISTS record(
url VARCHAR
);'''
self.db.execute(createTableSql, commit=True)
判断是否重复抓取的代码
result = self.db.execute('SELECT rowid,url from record where url=?', args=[videoUrl])
if len(result) == 0:
记录抓取完成的代码
spider.db.execute(r'INSERT INTO record (url) VALUES (?)', args=[item['url']], commit=True)
在spider类的结束方法中,关闭数据库
def closed(self, reason):
self.db.close()
在spider类的初始化方法中加入以下代码,有些细节需要自己修改
def __init__(self, name=None, **kwargs):
super().__init__(name, **kwargs)
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
path = r'chromedriver.exe'
self.browser = webdriver.Chrome(executable_path=path, options=chrome_options)
selenium使用例子
spider.browser.get(response.url)
div = self.browser.find_element_by_xpath(r'//div')
div.click()
userInput = self.browser.find_elements_by_xpath(r'//div//input[1]')[0]
userInput.send_keys("1234")
self.browser.switch_to.window(self.browser.window_handles[-1])
在spider类的结束方法中,关闭selenium
def closed(self, reason):
self.browser.quit()
Original: https://blog.csdn.net/m0_46200304/article/details/122022285
Author: python苦命人
Title: scrapy爬虫常用代码,各种技巧
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/816252/
转载文章受原作者版权保护。转载请注明原作者出处!