diff --git a/src/loader/DDBHFTLoader.py b/src/loader/DDBHFTLoader.py index 5360ea4..641acd3 100644 --- a/src/loader/DDBHFTLoader.py +++ b/src/loader/DDBHFTLoader.py @@ -96,8 +96,7 @@ class DDBHFTLoader(DDBLoader): default_table_capacity = 10000 # TODO: 这里需要饮用自身文件的绝对路径,然后再寻找其父目录 ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv' - - ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers) + #ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers) def init_ddb_database(self, df_calendar): """ @@ -120,16 +119,16 @@ class DDBHFTLoader(DDBLoader): stock_list = df_calendar['code'].unique().astype('str') # 不能重复创建Pool对象,因此需要在循环的最外侧创建好Pool对象,然后传参进去 - with Pool(self.num_workers if num_workers is None else num_workers) as pool: - # Always reuse the connection object, to reduce the memory consumption. - with self.mssql_engine.connect() as conn: - # Loop through the stock list. - for hft_type_name in self.hft_type_list: - print('Will work on hft type:', hft_type_name) - with tqdm(stock_list) as pbar: - for stock_id in pbar: - pbar.set_description(f"Working on stock {stock_id}") - self.dump_hft_to_ddb(hft_type_name, stock_id, conn, pbar=pbar, pool=pool) + #with Pool(self.num_workers if num_workers is None else num_workers) as pool: + # Always reuse the connection object, to reduce the memory consumption. + with self.mssql_engine.connect() as conn: + # Loop through the stock list. + for hft_type_name in self.hft_type_list: + print('Will work on hft type:', hft_type_name) + with tqdm(stock_list) as pbar: + for stock_id in pbar: + pbar.set_description(f"Working on stock {stock_id}") + self.dump_hft_to_ddb(hft_type_name, stock_id, conn, pbar=pbar) def _get_stock_date_list(self, cache=False): @@ -359,7 +358,7 @@ class DDBHFTLoader(DDBLoader): print('-' * 80) - def dump_hft_to_ddb(self, type_name, stock_id, conn, trade_date=None, pbar=None, pool=None): + def dump_hft_to_ddb(self, type_name, stock_id, conn, trade_date=None, pbar=None): if (type_name, stock_id, 'OK') in self.dump_journal_df.index: message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal." if pbar is None: @@ -413,13 +412,14 @@ class DDBHFTLoader(DDBLoader): # 使用多进程来加快速度 #with Pool(self.num_workers if num_workers is None else num_workers) as pool: - if pool is None: - print("Will create new Pool object, but this is not encourage for large batch work.") - pool = Pool(self.num_worker) + #if pool is None: + # print("Will create new Pool object, but this is not encourage for large batch work.") + # pool = Pool(self.num_worker) + py_proc_pool = Pool(self.num_workers) # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据 with tqdm(total=num_rows, leave=False) as sub_pbar: - for _ in pool.imap_unordered( + for _ in py_proc_pool.imap_unordered( functools.partial( DDBHFTLoader.dump_stock_daily_to_ddb, type_name = type_name, @@ -429,7 +429,12 @@ class DDBHFTLoader(DDBLoader): ): sub_pbar.update() + # Always remember to close and join the pool mannally. + py_proc_pool.close() + py_proc_pool.join() + del(row_list) + self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n") self.dump_journal_writer.flush() @@ -506,5 +511,6 @@ class DDBHFTLoader(DDBLoader): ddb_sess.undefAll() ddb_sess.close() del(ddb_sess) + del(row)