Guofu Li 2 years ago
commit 731a15bb01

@ -119,12 +119,15 @@ class DDBHFTLoader(DDBLoader):
# 不能重复创建Pool对象因此需要在循环的最外侧创建好Pool对象然后传参进去 # 不能重复创建Pool对象因此需要在循环的最外侧创建好Pool对象然后传参进去
with Pool(self.num_workers if num_workers is None else num_workers) as pool: with Pool(self.num_workers if num_workers is None else num_workers) as pool:
for hft_type_name in self.hft_type_list: # Always reuse the connection object, to reduce the memory consumption.
print('Will work on hft type:', hft_type_name) with self.mssql_engine.connect() as conn:
with tqdm(stock_list) as pbar: # Loop through the stock list.
for stock_id in pbar: for hft_type_name in self.hft_type_list:
pbar.set_description(f"Working on stock {stock_id}") print('Will work on hft type:', hft_type_name)
self.dump_hft_to_ddb(hft_type_name, stock_id, pbar=pbar, pool=pool) with tqdm(stock_list) as pbar:
for stock_id in pbar:
pbar.set_description(f"Working on stock {stock_id}")
self.dump_hft_to_ddb(hft_type_name, stock_id, conn, pbar=pbar, pool=pool)
def _get_stock_date_list(self, cache=False): def _get_stock_date_list(self, cache=False):
@ -354,7 +357,7 @@ class DDBHFTLoader(DDBLoader):
print('-' * 80) print('-' * 80)
def dump_hft_to_ddb(self, type_name, stock_id, trade_date=None, pbar=None, pool=None): def dump_hft_to_ddb(self, type_name, stock_id, conn, trade_date=None, pbar=None, pool=None):
if (type_name, stock_id, 'OK') in self.dump_journal_df.index: if (type_name, stock_id, 'OK') in self.dump_journal_df.index:
message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal." message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal."
if pbar is None: if pbar is None:
@ -376,54 +379,55 @@ class DDBHFTLoader(DDBLoader):
# 经过尝试按个股来做batch查询效率还是可以接受的 # 经过尝试按个股来做batch查询效率还是可以接受的
# mssql中索引字段是(S_INFO_WINDCODE, TRADE_DT) # mssql中索引字段是(S_INFO_WINDCODE, TRADE_DT)
with self.mssql_engine.connect() as conn: stat = """
stat = """ select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}]
select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}] where S_INFO_WINDCODE='{stock_id}'
where S_INFO_WINDCODE='{stock_id}' """.format(
""".format( mssql_type_name = self.mssql_name_dict[type_name],
mssql_type_name = self.mssql_name_dict[type_name], stock_id = stock_id
stock_id = stock_id )
) row_list = list(conn.execute(stat).fetchall())
row_list = list(conn.execute(stat).fetchall())
# 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期
# 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期 # 这里只把日期值不再`_journal_dt`的记录放入`row_list`
# 这里只把日期值不再`_journal_dt`的记录放入`row_list` if _journal_dt is not None:
if _journal_dt is not None: row_list = [row for row in row_list
row_list = [row for row in row_list if pd.to_datetime(row[1]) not in _journal_dt.index]
if pd.to_datetime(row[1]) not in _journal_dt.index] print(f"Resume job for {stock_id}, with {len(row_list)} rows left.")
print(f"Resume job for {stock_id}, with {len(row_list)} rows left.") del(_journal_dt)
num_rows = len(row_list) num_rows = len(row_list)
# 如果行数为0则说明是空数据可以直接返回 # 如果行数为0则说明是空数据可以直接返回
if num_rows == 0: if num_rows == 0:
return return
if pbar:
#pbar.set_description(f"Did get the result set for stock {stock_id} from mssql")
pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}")
else:
print(f"Did get the result set for stock {stock_id} from mssql")
# 每一行是当个个股某一日的所有高频交易信息
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None:
print("Will create new Pool object, but this is not encourage for large batch work.")
pool = Pool(self.num_worker)
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm(total=num_rows, leave=False) as sub_pbar:
for _ in pool.imap_unordered(
functools.partial(
DDBHFTLoader.dump_stock_daily_to_ddb,
type_name = type_name,
stock_id = stock_id
),
row_list
):
sub_pbar.update()
if pbar:
#pbar.set_description(f"Did get the result set for stock {stock_id} from mssql")
pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}")
else:
print(f"Did get the result set for stock {stock_id} from mssql")
# 每一行是当个个股某一日的所有高频交易信息
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None:
print("Will create new Pool object, but this is not encourage for large batch work.")
pool = Pool(self.num_worker)
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm(total=num_rows, leave=False) as sub_pbar:
for _ in pool.imap_unordered(
functools.partial(
DDBHFTLoader.dump_stock_daily_to_ddb,
type_name = type_name,
stock_id = stock_id
),
row_list
):
sub_pbar.update()
del(row_list)
self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n") self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n")
self.dump_journal_writer.flush() self.dump_journal_writer.flush()
@ -475,8 +479,8 @@ class DDBHFTLoader(DDBLoader):
ddb_sess.upload({df_table_name : df}) ddb_sess.upload({df_table_name : df})
# 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试 # 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( #ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
#ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
dbPath = DDBHFTLoader.ddb_path, dbPath = DDBHFTLoader.ddb_path,
partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix, partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
df_table_name = df_table_name df_table_name = df_table_name
@ -484,5 +488,6 @@ class DDBHFTLoader(DDBLoader):
# 由于不是复用`DDBHFTLoader`对象内部的Session因此如果不手动关闭就会造成内存逐渐泄漏 # 由于不是复用`DDBHFTLoader`对象内部的Session因此如果不手动关闭就会造成内存逐渐泄漏
ddb_sess.close() ddb_sess.close()
del(df)

@ -46,6 +46,10 @@ class DDBPITLoader(DDBLoader):
'AppearAtDate' : ('appear_at_date', 'DATE') 'AppearAtDate' : ('appear_at_date', 'DATE')
} }
symbol_col_set = {
'S_PROFITNOTICE_STYLE'
}
date_col_set = { date_col_set = {
'report_period', 'report_period',
'appear_in_period', 'appear_in_period',
@ -136,6 +140,11 @@ class DDBPITLoader(DDBLoader):
mssql_col_name_list.append(_col_name) mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(_col_name) ddb_col_name_list.append(_col_name)
ddb_col_type_list.append('DATE') ddb_col_type_list.append('DATE')
# 需要被转换成SYMBOL的字段
elif _col_name in self.symbol_col_set:
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(_col_name)
ddb_col_type_list.append('SYMBOL')
# 按照对照表进行类型转换 # 按照对照表进行类型转换
elif _col_type in self.ddb_type_mapping: elif _col_type in self.ddb_type_mapping:
mssql_col_name_list.append(_col_name) mssql_col_name_list.append(_col_name)

Loading…
Cancel
Save