@ -96,8 +96,7 @@ class DDBHFTLoader(DDBLoader):
default_table_capacity = 10000
# TODO: 这里需要饮用自身文件的绝对路径,然后再寻找其父目录
ddb_dump_journal_fname = ' ../assets/ddb_dump_journal.csv '
ddb_sess_pool = ddb . DBConnectionPool ( DDBLoader . ddb_config [ ' host ' ] , 8848 , num_workers )
#ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers)
def init_ddb_database ( self , df_calendar ) :
"""
@ -120,16 +119,16 @@ class DDBHFTLoader(DDBLoader):
stock_list = df_calendar [ ' code ' ] . unique ( ) . astype ( ' str ' )
# 不能重复创建Pool对象, 因此需要在循环的最外侧创建好Pool对象, 然后传参进去
with Pool ( self . num_workers if num_workers is None else num_workers ) as pool :
# Always reuse the connection object, to reduce the memory consumption.
with self . mssql_engine . connect ( ) as conn :
# Loop through the stock list.
for hft_type_name in self . hft_type_list :
print ( ' Will work on hft type: ' , hft_type_name )
with tqdm ( stock_list ) as pbar :
for stock_id in pbar :
pbar . set_description ( f " Working on stock { stock_id } " )
self . dump_hft_to_ddb ( hft_type_name , stock_id , conn , pbar = pbar , pool = pool )
#with Pool(self.num_workers if num_workers is None else num_workers) as pool :
# Always reuse the connection object, to reduce the memory consumption.
with self . mssql_engine . connect ( ) as conn :
# Loop through the stock list.
for hft_type_name in self . hft_type_list :
print ( ' Will work on hft type: ' , hft_type_name )
with tqdm ( stock_list ) as pbar :
for stock_id in pbar :
pbar . set_description ( f " Working on stock { stock_id } " )
self . dump_hft_to_ddb ( hft_type_name , stock_id , conn , pbar = pbar )
def _get_stock_date_list ( self , cache = False ) :
@ -359,7 +358,7 @@ class DDBHFTLoader(DDBLoader):
print ( ' - ' * 80 )
def dump_hft_to_ddb ( self , type_name , stock_id , conn , trade_date = None , pbar = None , pool = None ):
def dump_hft_to_ddb ( self , type_name , stock_id , conn , trade_date = None , pbar = None ):
if ( type_name , stock_id , ' OK ' ) in self . dump_journal_df . index :
message = f " Will skip ( { type_name } , { stock_id } ) as it appears in the dump journal. "
if pbar is None :
@ -413,13 +412,14 @@ class DDBHFTLoader(DDBLoader):
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None :
print ( " Will create new Pool object, but this is not encourage for large batch work. " )
pool = Pool ( self . num_worker )
#if pool is None :
# print("Will create new Pool object, but this is not encourage for large batch work.")
# pool = Pool(self.num_worker)
py_proc_pool = Pool ( self . num_workers )
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm ( total = num_rows , leave = False ) as sub_pbar :
for _ in p ool. imap_unordered (
for _ in p y_proc_p ool. imap_unordered (
functools . partial (
DDBHFTLoader . dump_stock_daily_to_ddb ,
type_name = type_name ,
@ -429,7 +429,12 @@ class DDBHFTLoader(DDBLoader):
) :
sub_pbar . update ( )
# Always remember to close and join the pool mannally.
py_proc_pool . close ( )
py_proc_pool . join ( )
del ( row_list )
self . dump_journal_writer . write ( f " { type_name } , { stock_id } ,OK \n " )
self . dump_journal_writer . flush ( )
@ -506,5 +511,6 @@ class DDBHFTLoader(DDBLoader):
ddb_sess . undefAll ( )
ddb_sess . close ( )
del ( ddb_sess )
del ( row )