From 59b970532e80961068c929b439f04496f25e5d90 Mon Sep 17 00:00:00 2001 From: Guofu Li Date: Thu, 25 Aug 2022 20:07:45 +0800 Subject: [PATCH] Update , move connection out of the loop, so that it can be reused, fixing another memory leak issue. --- src/loader/DDBHFTLoader.py | 110 +++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/src/loader/DDBHFTLoader.py b/src/loader/DDBHFTLoader.py index 2a02166..c14f786 100644 --- a/src/loader/DDBHFTLoader.py +++ b/src/loader/DDBHFTLoader.py @@ -119,12 +119,15 @@ class DDBHFTLoader(DDBLoader): # 不能重复创建Pool对象,因此需要在循环的最外侧创建好Pool对象,然后传参进去 with Pool(self.num_workers if num_workers is None else num_workers) as pool: - for hft_type_name in self.hft_type_list: - print('Will work on hft type:', hft_type_name) - with tqdm(stock_list) as pbar: - for stock_id in pbar: - pbar.set_description(f"Working on stock {stock_id}") - self.dump_hft_to_ddb(hft_type_name, stock_id, pbar=pbar, pool=pool) + # Always reuse the connection object, to reduce the memory consumption. + with self.mssql_engine.connect() as conn: + # Loop through the stock list. + for hft_type_name in self.hft_type_list: + print('Will work on hft type:', hft_type_name) + with tqdm(stock_list) as pbar: + for stock_id in pbar: + pbar.set_description(f"Working on stock {stock_id}") + self.dump_hft_to_ddb(hft_type_name, stock_id, conn, pbar=pbar, pool=pool) def _get_stock_date_list(self, cache=False): @@ -354,7 +357,7 @@ class DDBHFTLoader(DDBLoader): print('-' * 80) - def dump_hft_to_ddb(self, type_name, stock_id, trade_date=None, pbar=None, pool=None): + def dump_hft_to_ddb(self, type_name, stock_id, conn, trade_date=None, pbar=None, pool=None): if (type_name, stock_id, 'OK') in self.dump_journal_df.index: message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal." if pbar is None: @@ -376,53 +379,52 @@ class DDBHFTLoader(DDBLoader): # 经过尝试,按个股来做batch查询效率还是可以接受的 # mssql中,索引字段是(S_INFO_WINDCODE, TRADE_DT) - with self.mssql_engine.connect() as conn: - stat = """ - select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}] - where S_INFO_WINDCODE='{stock_id}' - """.format( - mssql_type_name = self.mssql_name_dict[type_name], - stock_id = stock_id - ) - row_list = list(conn.execute(stat).fetchall()) - - # 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期 - # 这里只把日期值不再`_journal_dt`的记录放入`row_list` - if _journal_dt is not None: - row_list = [row for row in row_list - if pd.to_datetime(row[1]) not in _journal_dt.index] - print(f"Resume job for {stock_id}, with {len(row_list)} rows left.") - - num_rows = len(row_list) - # 如果行数为0,则说明是空数据,可以直接返回 - if num_rows == 0: - return - - if pbar: - #pbar.set_description(f"Did get the result set for stock {stock_id} from mssql") - pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}") - else: - print(f"Did get the result set for stock {stock_id} from mssql") - - # 每一行是当个个股某一日的所有高频交易信息 - # 使用多进程来加快速度 - - #with Pool(self.num_workers if num_workers is None else num_workers) as pool: - if pool is None: - print("Will create new Pool object, but this is not encourage for large batch work.") - pool = Pool(self.num_worker) - - # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据 - with tqdm(total=num_rows, leave=False) as sub_pbar: - for _ in pool.imap_unordered( - functools.partial( - DDBHFTLoader.dump_stock_daily_to_ddb, - type_name = type_name, - stock_id = stock_id - ), - row_list - ): - sub_pbar.update() + stat = """ + select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}] + where S_INFO_WINDCODE='{stock_id}' + """.format( + mssql_type_name = self.mssql_name_dict[type_name], + stock_id = stock_id + ) + row_list = list(conn.execute(stat).fetchall()) + + # 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期 + # 这里只把日期值不再`_journal_dt`的记录放入`row_list` + if _journal_dt is not None: + row_list = [row for row in row_list + if pd.to_datetime(row[1]) not in _journal_dt.index] + print(f"Resume job for {stock_id}, with {len(row_list)} rows left.") + + num_rows = len(row_list) + # 如果行数为0,则说明是空数据,可以直接返回 + if num_rows == 0: + return + + if pbar: + #pbar.set_description(f"Did get the result set for stock {stock_id} from mssql") + pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}") + else: + print(f"Did get the result set for stock {stock_id} from mssql") + + # 每一行是当个个股某一日的所有高频交易信息 + # 使用多进程来加快速度 + + #with Pool(self.num_workers if num_workers is None else num_workers) as pool: + if pool is None: + print("Will create new Pool object, but this is not encourage for large batch work.") + pool = Pool(self.num_worker) + + # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据 + with tqdm(total=num_rows, leave=False) as sub_pbar: + for _ in pool.imap_unordered( + functools.partial( + DDBHFTLoader.dump_stock_daily_to_ddb, + type_name = type_name, + stock_id = stock_id + ), + row_list + ): + sub_pbar.update() self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n") self.dump_journal_writer.flush()