Tushare数据本地化多进程版本——如何快速一次性获取全部股票数据

在我自己早期使用tushare进行股票数据获取的时候遇到一个问题:如何快速获取全市场所有股票的行情数据呢?

一般来说,我们可以采用for循环的方式:

def Get_stock_basicinfo():
    stock_basicinfo = pro.stock_basic(list_status='L')
    return stock_basicinfo

设置股票池
stock_list = list(stock_basic_info['ts_code'])

for stock in stock_list:
    # ---获取后复权数据---
    df = ts.pro_bar(ts_code=stock_code, adj='hfq', start_date='20000101')
    # ---存入数据---
    df.to_csv(os.path.join('stock_hfq', f'{stock_code}.csv'))
    print(df.head(3))

但是据我所知,根据我的个人经验,因为网络延迟等原因,i+1的股票要等到i获取完才会开始,最终用for循环我们大概需要十个小时才能获取全部4000+股票的数据。。。

于是我们迎来了多线程版本:

(我一次性贴上全部代码,但是其中很多只是功能代码可以忽略,着重关注Get_Data就行了!)

#!/usr/bin/env python3
-*- coding: utf-8 -*-
"""
Created on Tue Sep 15 21:02:08 2020

@author: 梧承
"""

import multiprocessing
import os
import pandas as pd
import time
import tushare as ts

def log(text):
    print('-' * 15)
    print(text)

def set_tushare(token):
    ts.set_token(token)
    pro = ts.pro_api()
    return pro

def set_done_code():
    work_dir = os.getcwd()
    file_path = os.path.join(work_dir, 'done_code.csv')
    flag = 0
    if not os.path.exists(file_path):
        # 如果不存在记录文件则创建
        done_code = pd.DataFrame(dict(done_code=[]))
        done_code.to_csv('done_code.csv')
        log('创建:done_code.csv')
    else:
        done_code = pd.read_csv(file_path, index_col=[0])
        if not done_code.empty:
            flag = input('是否继续上次未加载股票?确认1否认0:')
            if flag == 0:
                done_code = pd.DataFrame(dict(done_code=[]))
                done_code.to_csv('done_code.csv')
                log('已重置:done_code.csv')
        else:
            pass
    assert flag == 0 or 1
    return flag, list(done_code['done_code'])

def set_mkdir(dir_name):
    work_dir = os.getcwd()
    if not os.path.isdir(os.path.join(work_dir, dir_name)):
        os.mkdir(dir_name)
        log(f'已创建:{dir_name}')
    else:
        log(f'已存在:{dir_name}')

def record_done_code(stock_code):
    done_code = pd.read_csv('done_code.csv', index_col=[0])
    d_code = pd.DataFrame(dict(done_code=stock_code), index=[0])
    done_code.append(d_code, ignore_index=True)
    done_code.to_csv('done_code.csv')

def Get_stock_basicinfo():
    stock_basicinfo = pro.stock_basic(list_status='L')
    return stock_basicinfo

def Get_Data(stock_code):
    try:
        # ---获取数据---
        df = ts.pro_bar(ts_code=stock_code, adj='hfq', start_date='20000101')
        # ---存入数据---
        df.to_csv(os.path.join('stock_hfq', f'{stock_code}.csv'))
        # 记录已获取股票
        record_done_code(stock_code)
        time.sleep(1)
        print(df.head(3))
    except:
        print(f'未能获取数据:{stock_code}')
        pass

if __name__ == '__main__':
    set_mkdir('stock_hfq')
    # 设置token
    pro = set_tushare('你的token 可以从tushare网站获取')
    # 获取股票基本数据
    stock_basic_info = Get_stock_basicinfo()
    # 设置股票池
    flag, done_code = set_done_code()
    stock_list = list(stock_basic_info['ts_code'])
    # 如果flag==0则直接使用stock_list 否则使用stock_list与done_code的差集即未完成的股票
    code = stock_list if flag == 0 else stock_list.remove(done_code)
    # 设置进程池
    p = multiprocessing.Pool(8)
    b = p.map(Get_Data, code)
    p.close()
    p.join()

特别注明的是,你需要根据你的cpu核心数决定这一行代码的参数:

p = multiprocessing.Pool(8)

如果不出意外的话,用多线程方法时间将从十个小时缩短到十分钟左右。请把牛逼打在公屏上!

另外,我修改我之前的程序添加了从上次剩余为获取股票继续开始获取的功能,由于技术水平有限没有搞明白怎么在map里面添加两个参数来在内部传递done_code,因此稍微复杂地采用外部文件记录的方式来进行, 不过也算是可以使用吧!

在我学习Python的早期,是CSDN上很多慷慨无私分享的大神帮助我度过难关。很多知识的学习是个陡峭的学习过程,很肯能并不是这个知识有多么苦难,而是过于陡峭的曲线劝退了大多数人。

我知道Python是一个值得掌握的技能,因此我把这些内容分享出来,因为这就是互联网精神。希望给还在坑里面摸爬滚打的朋友一点光亮。

另外大家有兴趣可以关注我的个人公众号:梧承 Book House

Original: https://blog.csdn.net/weixin_44566452/article/details/120830727
Author: 梧承
Title: Tushare数据本地化多进程版本——如何快速一次性获取全部股票数据

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/754434/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球