按日更新股票数据——从零到实盘13

按日更新股票数据——从零到实盘13

前文介绍了多进程创建股票数据的过程,整个创建过程大概约10几分钟。在实盘时,每个交易日都有新数据生成,我们没有必要对全面历史时间都进行重新创建计算,只需要下载新产生的日线数据,每次更新需要3分钟左右。本文记录多进程按日更新股票数据的过程。

; 主要代码分析

新建源文件,命名为data_center_v11.py,全部内容见文末,v11新增2个函数:

新增更新数据函数

def update_data(stock_codes, query_days=60, adjustflag='2'):

该函数用于更新日线数据,计算相关因子,其中:

  • 参数stock_codes为新数据的股票代码
  • 参数query_days为在数据库中查询历史日线数据的天数,用于计算扩展因子,需要根据扩展因子设置,这里要计算60日均线,所以最小设置为60
  • 参数adjustflag为复权选项,为1时表示后复权,为2时表示前复权,为3时表示不复权,默认为前复权
  • 返回值为包含所有待处理股票的最新一日日线数据及扩展因子的DataFrame
    engine = create_mysql_engine()

创建数据库引擎对象。

    latest_df = pd.DataFrame()

创建空DataFrame,存储最新一日的数据。

    for index, code in enumerate(stock_codes):

更新数据循环。

        bs.login()

登录BaoStock。

        table_name = '{}_{}'.format(code[3:], code[:2])

股票数据在数据库中的表名。

        if table_name not in sqlalchemy.inspect(engine).get_table_names():
            continue

判断是否存在该表,不存在则跳过。

        sql_cmd = 'SELECT * FROM {} ORDER BY date DESC LIMIT {};'.format(table_name, query_days)
        read_df = pd.read_sql(sql=sql_cmd, con=engine)

获取按时间排序的最后query_days行数据。

        if read_df.shape[0] < query_days:
            continue

如果数据少于query_days,则不更新。

        read_df = read_df.sort_values(by='id', ascending=True)

数据按id(date)升序排序,后续计算扩展因子需要数据按日期升序排序。

        last_id = read_df['id'].iloc[-1]
        from_date = read_df['date'].iloc[-1]

获取数据库中最新数据日期及id。

        if from_date >= datetime.date.today().strftime('%Y-%m-%d'):

如果数据库中已包含最新一日数据。

            latest_series = read_df.iloc[-1].copy()
            latest_series['code'] = code[3:]
            latest_df = latest_df.append(latest_series)
            continue

将最新一日数据,添加code字段,append到latest_df中,并进行下一只股票更新。

        from_date = (datetime.datetime.strptime(
            from_date, '%Y-%m-%d') + datetime.timedelta(days=1)).strftime('%Y-%m-%d')

计算待更新数据的开始日期。

        out_df = bs.query_history_k_data_plus(code, g_baostock_data_fields, start_date=from_date,
                                              end_date=datetime.date.today().strftime('%Y-%m-%d'),
                                              frequency='d', adjustflag=adjustflag).get_data()

下载从开始日期到当日的日线数据。

        if out_df.shape[0]:
            out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]

剔除停盘数据。

        if not out_df.shape[0]:
            continue

过滤后如果数据为空,则不更新。

        convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
        out_df[convert_list] = out_df[convert_list].astype(float)

将数值数据转为float型,便于后续处理。

        new_rows = out_df.shape[0]

记录新添加的行数。

        out_df = read_df[list(out_df)].append(out_df)

获取下载字段,拼接DataFrame,用于后续计算扩展指标。

        out_df.reset_index(drop=True, inplace=True)

重置索引。

        out_df = extend_factor(out_df)

计算扩展因子

        out_df = out_df.iloc[-new_rows:]

取最后new_rows行。

        out_df['id'] = pd.Series(np.arange(last_id + 1, last_id + 1 + new_rows), index=out_df.index)

更新id。

        out_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)

将更新数据添加到数据库。

        latest_series = out_df.iloc[-1].copy()
        latest_series['code'] = code[3:]
        latest_df = latest_df.append(latest_series)

将更新的最后一行添加code字段,append到latest_df中。

    return latest_df

返回包含最新一日股票日线数据的DataFrame。

新增多进程更新数据函数

def update_data_mp(stock_codes, process_num=61, query_days=60, adjustflag='2'):

该函数使用多进程更新日线数据,计算扩展因子,其中:

  • 参数stock_codes为待更新数据的股票代码
  • 参数process_num为进程数
  • 参数query_days为在数据库中查询历史日线数据的天数,用于计算扩展因子
  • 参数adjustflag为复权选项,为1时表示后复权,为2时表示前复权,为3时表示不复权,默认为前复权
  • 返回值为包含所有待处理股票的最新一日日线数据的DataFrame

该函数会将最新一日各个股票数据存储在数据库表latest中,便于后续筛选候选股票使用。

    latest_df = multiprocessing_func_df(update_data, (process_num, stock_codes, query_days, adjustflag,))

多进程计算获得最新日线数据,调用multiprocessing_func_df函数,收集子进程调用update_data函数返回的DataFrame数据。

    if latest_df.shape[0]:
        latest_df.to_sql(name='latest', con=create_mysql_engine(), if_exists='replace', index=False)

将所有股票最新一日日线数据写入数据库表latest。

    return latest_df

返回值包含所有待处理股票的最新一日日线数据的DataFrame。

2021年11月26日更新后latest表如下图所示:

按日更新股票数据——从零到实盘13

小结

本文实现了多进程更新股票数据,并把最新一日各只股票的数据存在数据库表latest中,供后续候选股票筛选使用。
下一篇文章将记录筛选候选股票的过程。

data_center_v11.py的全部代码如下:

import baostock as bs
import datetime
import time
import sys
import numpy as np
import pandas as pd
import multiprocessing
import sqlalchemy
import matplotlib.pyplot as plt
from pandas.plotting import table

g_available_days_limit = 250

g_baostock_data_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST'

def create_mysql_engine():
"""
    创建数据库引擎对象

    :return: 新创建的数据库引擎对象
"""

    host = 'localhost'
    user = 'root'
    passwd = '111111'
    port = '3306'
    db = 'db_quant'

    mysql_engine = sqlalchemy.create_engine(
        'mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),
        poolclass=sqlalchemy.pool.NullPool
    )

    mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))

    db_engine = sqlalchemy.create_engine(
        'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),
        poolclass=sqlalchemy.pool.NullPool
    )

    return db_engine

def get_stock_codes(date=None, update=False):
"""
    获取指定日期的A股代码列表

    若参数update为False,表示从数据库中读取股票列表
    若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
    若参数date为空,则返回最近1个交易日的A股代码列表
    若参数date不为空,且为交易日,则返回date当日的A股代码列表
    若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出

    :param date: 日期,默认为None
    :param update: 是否更新股票列表,默认为False
    :return: A股代码的列表
"""

    engine = create_mysql_engine()

    table_name = 'stock_codes'

    if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:

        bs.login()

        stock_df = bs.query_all_stock(date).get_data()

        if 0 == len(stock_df):

            if date is not None:
                print('当前选择日期为非交易日或尚无交易数据,请设置date为历史某交易日日期')
                sys.exit(0)

            delta = 1
            while 0 == len(stock_df):
                stock_df = bs.query_all_stock(datetime.date.today() - datetime.timedelta(days=delta)).get_data()
                delta += 1

        bs.logout()

        stock_df = stock_df[(stock_df['code'] >= 'sh.600000') & (stock_df['code'] < 'sz.399000')]

        stock_df.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)

        return stock_df['code'].tolist()

    else:

        sql_cmd = 'SELECT {} FROM {}'.format('code', table_name)

        return pd.read_sql(sql=sql_cmd, con=engine)['code'].tolist()

def create_data(stock_codes, from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'),
                adjustflag='2'):
"""
    下载指定日期内,指定股票的日线数据,计算扩展因子

    :param stock_codes: 待下载数据的股票代码
    :param from_date: 日线开始日期
    :param to_date: 日线结束日期
    :param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权
    :return: None
"""

    engine = create_mysql_engine()

    for index, code in enumerate(stock_codes):
        print('({}/{})正在创建{}...'.format(index + 1, len(stock_codes), code))

        bs.login()

        out_df = bs.query_history_k_data_plus(code, g_baostock_data_fields, start_date=from_date, end_date=to_date,
                                              frequency='d', adjustflag=adjustflag).get_data()

        if out_df.shape[0]:
            out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]

        if not out_df.shape[0]:
            continue

        out_df.drop_duplicates(['date'], inplace=True)

        if out_df.shape[0] < g_available_days_limit:
            continue

        convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
        out_df[convert_list] = out_df[convert_list].astype(float)

        out_df.reset_index(drop=True, inplace=True)

        out_df = extend_factor(out_df)

        table_name = '{}_{}'.format(code[3:], code[:2])
        out_df.to_sql(name=table_name, con=engine, if_exists='replace', index=True, index_label='id')

def get_code_group(process_num, stock_codes):
"""
    获取代码分组,用于多进程计算,每个进程处理一组股票

    :param process_num: 进程数
    :param stock_codes: 待处理的股票代码
    :return: 分组后的股票代码列表,列表的每个元素为一组股票代码的列表
"""

    code_group = [[] for i in range(process_num)]

    for index, code in enumerate(stock_codes):
        code_group[index % process_num].append(code)

    return code_group

def multiprocessing_func(func, args):
"""
    多进程调用函数

    :param func: 函数名
    :param args: func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表
    :return: 包含各子进程返回对象的列表
"""

    results = []

    with multiprocessing.Pool(processes=args[0]) as pool:

        for codes in get_code_group(args[0], args[1]):
            results.append(pool.apply_async(func, args=(codes, *args[2:],)))

        pool.close()

        pool.join()

    return results

def create_data_mp(stock_codes, process_num=61,
                   from_date='1990-12-19', to_date=datetime.date.today().strftime('%Y-%m-%d'), adjustflag='2'):
"""
    使用多进程创建指定日期内,指定股票的日线数据,计算扩展因子

    :param stock_codes: 待创建数据的股票代码
    :param process_num: 进程数
    :param from_date: 日线开始日期
    :param to_date: 日线结束日期
    :param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权
    :return: None
"""

    multiprocessing_func(create_data, (process_num, stock_codes, from_date, to_date, adjustflag,))

def extend_factor(df):
"""
    计算扩展因子

    :param df: 待计算扩展因子的DataFrame
    :return: 包含扩展因子的DataFrame
"""

    df = df.pipe(zt).pipe(ss, delta_days=30).pipe(candidate)

    return df

def zt(df):
"""
    计算涨停因子

    若涨停,则因子为True,否则为False
    以当日收盘价较前一日收盘价上涨9.8%及以上作为涨停判断标准

    :param df: 待计算扩展因子的DataFrame
    :return: 包含扩展因子的DataFrame
"""

    df['zt'] = np.where((df['close'].values >= 1.098 * df['preclose'].values), True, False)

    return df

def shift_i(df, factor_list, i, fill_value=0, suffix='a'):
"""
    计算移动因子,用于获取前i日或者后i日的因子

    :param df: 待计算扩展因子的DataFrame
    :param factor_list: 待移动的因子列表
    :param i: 移动的步数
    :param fill_value: 用于填充NA的值,默认为0
    :param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益
    :return: 包含扩展因子的DataFrame
"""

    shift_df = df[factor_list].shift(i, fill_value=fill_value)

    shift_df.rename(columns={x: '{}_{}{}'.format(x, i, suffix) for x in factor_list}, inplace=True)

    df = pd.concat([df, shift_df], axis=1)

    return df

def shift_till_n(df, factor_list, n, fill_value=0, suffix='a'):
"""
    计算范围移动因子

    用于获取前/后n日内的相关因子,内部调用了shift_i

    :param df: 待计算扩展因子的DataFrame
    :param factor_list: 待移动的因子列表
    :param n: 移动的步数范围
    :param fill_value: 用于填充NA的值,默认为0
    :param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益
    :return: 包含扩展因子的DataFrame
"""

    for i in range(n):
        df = shift_i(df, factor_list, i + 1, fill_value, suffix)
    return df

def ss(df, delta_days=30):
"""
    计算双神因子,即间隔的两个涨停

    若当日形成双神,则因子为True,否则为False

    :param df: 待计算扩展因子的DataFrame
    :param delta_days: 两根涨停间隔的时间不能超过该值,否则不判定为双神,默认值为30
    :return: 包含扩展因子的DataFrame
"""

    temp_df = shift_till_n(df, ['zt'], delta_days, fill_value=False)

    col_list = ['zt_{}a'.format(x) for x in range(2, delta_days + 1)]

    df['ss'] = temp_df[col_list].any(axis=1) & ~temp_df['zt_1a'] & temp_df['zt']

    return df

def ma(df, n=5, factor='close'):
"""
    计算均线因子

    :param df: 待计算扩展因子的DataFrame
    :param n: 待计算均线的周期,默认计算5日均线
    :param factor: 待计算均线的因子,默认为收盘价
    :return: 包含扩展因子的DataFrame
"""

    name = '{}ma_{}'.format('' if 'close' == factor else factor + '_', n)

    s = pd.Series(df[factor], name=name, index=df.index)

    s = s.rolling(center=False, window=n).mean()

    df = df.join(s)

    df[name] = df[name].apply(lambda x: round(x + 0.001, 2))

    return df

def mas(df, ma_list, factor='close'):
"""
    计算多条均线因子,内部调用ma计算单条均线

    :param df: 待计算扩展因子的DataFrame
    :param ma_list: 待计算均线的周期列表
    :param factor: 待计算均线的因子,默认为收盘价
    :return: 包含扩展因子的DataFrame
"""

    for i in ma_list:
        df = ma(df, i, factor)
    return df

def cross_mas(df, ma_list):
"""
    计算穿均线因子

    若当日最低价不高于均线价格
    且当日收盘价不低于均线价格
    则当日穿均线因子值为True,否则为False

    :param df: 待计算扩展因子的DataFrame
    :param ma_list: 均线的周期列表
    :return: 包含扩展因子的DataFrame
"""

    for i in ma_list:
        df['cross_{}'.format(i)] = (df['low']  df['ma_{}'.format(i)]) & (
                df['ma_{}'.format(i)]  df['close'])
    return df

def candidate(df):
"""
    计算是否为候选

    若当日日线同时穿过5、10、20、30日均线
    且30日均线在60日均线上方
    且当日形成双神
    则当日作为候选,该因子值为True,否则为False

    :param df: 待计算扩展因子的DataFrame
    :return: 包含扩展因子的DataFrame
"""

    ma_list = [5, 10, 20, 30, 60]

    temp_df = mas(df, ma_list)

    temp_df = cross_mas(temp_df, ma_list)

    column_list = ['cross_{}'.format(x) for x in ma_list[:-1]]

    df['candidate'] = temp_df[column_list].all(axis=1) & (temp_df['ma_30'] >= temp_df['ma_60']) & df['ss']

    return df

def profit_loss_statistic(stock_codes, hold_days=10):
"""
    盈亏分布统计,计算当日candidate为True,持仓hold_days天的收益分布

    :param stock_codes: 待分析的股票代码
    :param hold_days: 持仓天数
    :return: 筛选出符合买入条件的股票DataFrame
"""

    engine = create_mysql_engine()

    candidate_df = pd.DataFrame()

    for code in stock_codes:
        print('正在处理{}...'.format(code))

        table_name = '{}_{}'.format(code[3:], code[:2])

        if table_name not in sqlalchemy.inspect(engine).get_table_names():
            continue

        cols = 'date, open, high, low, close, candidate'
        sql_cmd = 'SELECT {} FROM {} ORDER BY date DESC'.format(cols, table_name)
        df = pd.read_sql(sql=sql_cmd, con=engine)

        df = shift_i(df, ['open'], 1, suffix='l')
        df = shift_till_n(df, ['high', 'low'], hold_days, suffix='l')

        df = df.iloc[hold_days: df.shape[0] - g_available_days_limit, :]

        df = df[(df['candidate'] > 0) & (df['low_1l']  df['close'])]

        if df.shape[0]:
            df['code'] = code
            candidate_df = candidate_df.append(df)

    if candidate_df.shape[0]:

        cols = ['high_{}l'.format(x) for x in range(2, hold_days + 1)]
        candidate_df['max_high'] = candidate_df[cols].max(axis=1)
        candidate_df['max_profit'] = candidate_df['max_high'] / candidate_df[['open_1l', 'close']].min(axis=1) - 1

        cols = ['low_{}l'.format(x) for x in range(2, hold_days + 1)]
        candidate_df['min_low'] = candidate_df[cols].min(axis=1)
        candidate_df['max_loss'] = candidate_df['min_low'] / candidate_df[['open_1l', 'close']].min(axis=1) - 1

    return candidate_df

def multiprocessing_func_df(func, args):
"""
    多进程调用函数,收集返回各子进程返回的DataFrame

    :param func: 函数名
    :param args: func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表
    :return: 包含各子进程返回值的DataFrame
"""

    results = multiprocessing_func(func, args)

    df = pd.DataFrame()

    for i in results:
        df = df.append(i.get())

    return df

def profit_loss_statistic_mp(stock_codes, process_num=61, hold_days=10):
"""
    多进程盈亏分布统计,计算当日candidate为True,持仓hold_days天的收益分布
    输出数据分布表格及图片文件

    :param stock_codes: 待分析的股票代码
    :param process_num: 进程数
    :param hold_days: 持仓天数
    :return: None
"""

    candidate_df = multiprocessing_func_df(profit_loss_statistic, (process_num, stock_codes, hold_days,))

    candidate_df[['date', 'code', 'max_profit', 'max_loss']].to_excel(
        'profit_loss_{}.xlsx'.format(hold_days), index=False, encoding='utf-8')

    candidate_df.reset_index(inplace=True)

    fig, ax = plt.subplots(1, 1)
    table(ax, np.round(candidate_df[['max_profit', 'max_loss']].describe(), 4), loc='upper right',
          colWidths=[0.2, 0.2, 0.2])
    candidate_df[['max_profit', 'max_loss']].plot(ax=ax, legend=None)

    fig.savefig('profit_loss_{}.png'.format(hold_days))

    plt.show()

def update_data(stock_codes, query_days=60, adjustflag='2'):
"""
    更新日线数据,计算相关因子

    :param stock_codes: 待更新数据的股票代码
    :param query_days: 在数据库中查询历史日线数据的天数,用于计算扩展因子,需要根据扩展因子设置,这里要计算60日均线,所以最小设置为60
    :param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权
    :return: 包含所有待处理股票的最新一日日线数据及扩展因子的DataFrame
"""

    engine = create_mysql_engine()

    latest_df = pd.DataFrame()

    for index, code in enumerate(stock_codes):
        print('({}/{})正在更新{}...'.format(index + 1, len(stock_codes), code))

        bs.login()

        table_name = '{}_{}'.format(code[3:], code[:2])

        if table_name not in sqlalchemy.inspect(engine).get_table_names():
            continue

        sql_cmd = 'SELECT * FROM {} ORDER BY date DESC LIMIT {};'.format(table_name, query_days)
        read_df = pd.read_sql(sql=sql_cmd, con=engine)

        if read_df.shape[0] < query_days:
            continue

        read_df = read_df.sort_values(by='id', ascending=True)

        last_id = read_df['id'].iloc[-1]
        from_date = read_df['date'].iloc[-1]

        if from_date >= datetime.date.today().strftime('%Y-%m-%d'):

            latest_series = read_df.iloc[-1].copy()
            latest_series['code'] = code[3:]
            latest_df = latest_df.append(latest_series)
            print('{}当前已为最新数据'.format(code))
            continue

        from_date = (datetime.datetime.strptime(
            from_date, '%Y-%m-%d') + datetime.timedelta(days=1)).strftime('%Y-%m-%d')

        out_df = bs.query_history_k_data_plus(code, g_baostock_data_fields, start_date=from_date,
                                              end_date=datetime.date.today().strftime('%Y-%m-%d'),
                                              frequency='d', adjustflag=adjustflag).get_data()

        if out_df.shape[0]:
            out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]

        if not out_df.shape[0]:
            continue

        convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
        out_df[convert_list] = out_df[convert_list].astype(float)

        new_rows = out_df.shape[0]

        out_df = read_df[list(out_df)].append(out_df)

        out_df.reset_index(drop=True, inplace=True)

        out_df = extend_factor(out_df)

        out_df = out_df.iloc[-new_rows:]

        if np.any(out_df.isnull()):
            print('{}有缺失字段!!!'.format(code))

        out_df['id'] = pd.Series(np.arange(last_id + 1, last_id + 1 + new_rows), index=out_df.index)

        out_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)

        latest_series = out_df.iloc[-1].copy()
        latest_series['code'] = code[3:]
        latest_df = latest_df.append(latest_series)

    return latest_df

def update_data_mp(stock_codes, process_num=61, query_days=60, adjustflag='2'):
"""
    使用多进程更新日线数据,计算扩展因子

    :param stock_codes: 待更新数据的股票代码
    :param process_num: 进程数
    :param query_days: 在数据库中查询历史日线数据的天数,用于计算扩展因子
    :param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权
    :return: 包含所有待处理股票的最新日线数据的DataFrame
"""

    latest_df = multiprocessing_func_df(update_data, (process_num, stock_codes, query_days, adjustflag,))

    if latest_df.shape[0]:
        latest_df.to_sql(name='latest', con=create_mysql_engine(), if_exists='replace', index=False)

    return latest_df

if __name__ == '__main__':
    start_time = time.time()
    stock_codes = get_stock_codes()
    update_data_mp(stock_codes)
    end_time = time.time()
    print('程序运行时间:{}s'.format(end_time - start_time))

博客内容只用于交流学习,不构成投资建议,盈亏自负!
个人博客:https://coderx.com.cn/(优先更新)
项目最新代码:https://gitee.com/sl/quant_from_scratch
欢迎大家转发、留言。已建微信群用于学习交流,群1已满,群2已创建,感兴趣的读者请扫码加微信!

Original: https://blog.csdn.net/m0_46603114/article/details/122389687
Author: 码农甲V
Title: 按日更新股票数据——从零到实盘13

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/755212/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球