打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Python - 快速处理通达信下载的A股历史行情数据(完整代码)
userphoto

2022.12.03 浙江

关注

为了克服这些问题,可以使用证券公司提供的客户端程序(有通达信版、同花顺版等)下载历史行情数据,每个交易日收盘15分钟后,即可下载。下载后的数据的格式和长度不一定满足自己的要求,可以用本文提供的python代码,本文也提供了整个操作过程的视频演示。

本文提供的历史行情数据处理的python代码文件,其中使用了多进程工具包 multiprocessing,以便充分发挥多核多线程CPU的能力,提高数据处理的速度。沪深两个交易所现在总共有将近5000只股票,把所有的日线、5分钟线和1分钟线数据都处理一边,如果用双核CPU电脑,可能需要一个小时。处理数据所耗时间和python代码是否采用多进程多线程方法和CPU的核数都有关系。笔者处理数据使用的电脑是两颗 Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz 2.20 GHz 处理器,共2*20线程,全部数据处理一次只需要 124 秒。

本文提供的python程序,包括三个自定义函数:

  • def preprocess_historymd(path_filename_mdrows),先对下载后的历史行情数据进行预处理,删除行情数据文件的表头和末尾行。未做处理前的行情数据为csv格式,如下图:
  • def multiprocess_filteringquotes(quote_watchingsymbol_list),构建多进程池
  • def filtering_quotes(onequote_watchingsymbollist), 处理单个的行情数据文件

完整代码如下:

'''按指定行数截取行情数据'''import osimport csvfrom multiprocessing import Poolfrom datetime import datetime, timedeltaimport timedef filtering_quotes(onequote_watchingsymbollist): new_quote_itm = onequote_watchingsymbollist[0] watching_marketsymbol_lst = onequote_watchingsymbollist[1] filterquote = '' if new_quote_itm[9:21] in watching_marketsymbol_lst: quote_line = new_quote_itm.split('-') normal_dt = str(datetime.combine(datetime.today(),datetime.strptime(quote_line[0][2:],'%H%M%S').time())) filterquote = [normal_dt,quote_line[1],quote_line[2],quote_line[3],quote_line[4]] else: filterquote = '' return filterquotedef multiprocess_filteringquotes(quote_watchingsymbol_list): pool = Pool() quote_data = pool.map(filtering_quotes,quote_watchingsymbol_list) pool.close() pool.join() return quote_datadef preprocess_historymd(path_filename_mdrows): path_filename = path_filename_mdrows[0] total_mdrows = path_filename_mdrows[1] cut_lastrows = path_filename_mdrows[2] with open(path_filename,'r') as csv_file: #watched_stocks.csv, watch_list.csv csv_reader = csv.reader(csv_file) csv2list = [itm for itm in csv_reader] csv2list.pop(0) csv2list.pop(0) csv2list.pop(-1) if total_mdrows < len(csv2list): csv2list_cut = csv2list[len(csv2list)-(total_mdrows+abs(cut_lastrows)):(len(csv2list)+cut_lastrows)] else: csv2list_cut = csv2list[0:(len(csv2list)+cut_lastrows)] csv2list_new = [] for itm in csv2list_cut: if len(itm) == 7: csv2list_new.append([path_filename[-10:-4], itm[0].replace('/','-'),itm[1],itm[2],itm[3],itm[4],itm[5],itm[6]]) if len(itm) == 8: csv2list_new.append([path_filename[-10:-4], itm[0].replace('/','-')+' '+itm[1][0:2]+':'+itm[1][2:4], itm[2],itm[3],itm[4],itm[5],itm[6],itm[7]]) # with open(path_filename,'w') as f: #仅用于测试观察数据 # for i in range(0,len(csv2list_new)): # x = str(csv2list_new[i]).replace(''','').replace('[','').replace(']','')+'\n' # # x = csv2list_new[i] # f.write(x) print(f'行情文件 {path_filename} 处理完成, 共 { len(csv2list_new)} 行:')def multiprocess_historymd(historymd_filename_list): pool = Pool() historymd_data = pool.map(preprocess_historymd,historymd_filename_list) pool.close() pool.join() return historymd_data if __name__ == '__main__': T_start = time.perf_counter() given_dailymd_path = 'D:\\SecurityData\\MD_daily' given_min1md_path = 'D:\\SecurityData\\MD_1min' # given_dailymd_path = 'D:\\SecurityData\\test_dailyMD' # given_min1md_path = 'D:\\SecurityData\\test_min1MD' # 选取指定时间段的历史行情,如倒数3个交易日之前的1000个交易日, # 选取指定时间段的1分钟历史行情,如倒数3个交易日之前的80个交易日,注意倒数几个交易日必须与日线相同 total_days = 1000 #选取 80 个交易日的1分钟数据 total_mindays = 80 #如果截止到当前最后一个交易日,则倒数0个交易日; 如果截止到倒数第3个交易日的数据,downcount_days = -3 downcounter_days = 0 ''' 处理通达信历史行情数据文件 - 日线行情 ''' dailyfilename_list = os.listdir(given_dailymd_path) daily_filename = str(dailyfilename_list) dailyfilename_split = daily_filename.split(',') dailymdfiles_list = [given_dailymd_path +'\\' + itm[2:15] for itm in dailyfilename_split] dailymdrows_total = [total_days]*len(dailymdfiles_list) if downcounter_days < 0: # 截止到倒数0个交易日的日线 dailymdrows_end = [downcounter_days]*len(dailymdfiles_list) if downcounter_days == 0: # 截止到最后一个交易日,不用倒数 dailymdrows_end = [0]*len(dailymdfiles_list) daily_result = multiprocess_historymd(zip(dailymdfiles_list,dailymdrows_total,dailymdrows_end)) ''' 处理通达信历史行情数据文件 - 1分钟行情 ''' min1filename_list = os.listdir(given_min1md_path) min1_filename = str(min1filename_list) min1filename_split = min1_filename.split(',') min1mdfiles_list = [given_min1md_path +'\\' + itm[2:15] for itm in min1filename_split] min1mdrows_total = [240*total_mindays]*len(min1mdfiles_list) if downcounter_days < 0: # 截止到倒数3个交易日的1分钟线 min1mdrows_end = [downcounter_days*240]*len(min1mdfiles_list) if downcounter_days == 0: # 截止到最后一个交易日,不用倒数 min1mdrows_end = [0]*len(min1mdfiles_list) min1_result = multiprocess_historymd(zip(min1mdfiles_list,min1mdrows_total,min1mdrows_end)) T1_end = time.perf_counter() print(f'\n耗时 {round(T1_end-T_start,2)} 秒\n')

处理后的历史行情数据格式如下图所示:

整个操作过程的视频演示,点击这里 ,看西瓜视频。

注意:获取几年的日线数据或分钟数据,与所在电脑上安装和使用通达信客户端程序的时间长短有关,初次安装后,默认可下载3年的日线数据,4个月的1分钟数据。

(本文完)

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
使用Python处理手机通讯录导入与导出文件
Python 将数据写入文件(txt、csv、excel)
读取、创建和运行多个文件的3个Python技巧
从零实现”搭积木式实现策略“的回测系统 part II
Keras
ArcPy读取Excel时序数据、批量反距离加权IDW插值与掩膜
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服