main
Guofu Li 2 years ago
parent 1764036d81
commit 94314df941

@ -27,7 +27,7 @@ from .DDBLoader import DDBLoader
class DDBHFTLoader(DDBLoader): class DDBHFTLoader(DDBLoader):
""" """
0. 从sql-server中读取calendar数据并创建成员变量df_calendardf_calendar可以保存在本地pickle作为缓存 0. 从sql-server中读取calendar数据并创建成员变量df_calendardf_calendar可以保存在本地pickle作为缓存
|- `def make_calendar_df(self) -> df_calendar` |- `def make_calendar_df(Nelf) -> df_calendar`
1. 创建ddb中的数据库分区性质从calendar数据中获取 1. 创建ddb中的数据库分区性质从calendar数据中获取
|- `def create_ddb_database(self, df_calendar) -> void` |- `def create_ddb_database(self, df_calendar) -> void`
@ -416,7 +416,7 @@ class DDBHFTLoader(DDBLoader):
with tqdm(total=num_rows, leave=False) as sub_pbar: with tqdm(total=num_rows, leave=False) as sub_pbar:
for _ in pool.imap_unordered( for _ in pool.imap_unordered(
functools.partial( functools.partial(
dump_stock_daily_to_ddb, DDBHFTLoader.dump_stock_daily_to_ddb,
type_name = type_name, type_name = type_name,
stock_id = stock_id stock_id = stock_id
), ),
@ -434,7 +434,7 @@ class DDBHFTLoader(DDBLoader):
用于做多进程录入ddb的函数 用于做多进程录入ddb的函数
""" """
blob = gzip.decompress(blob) blob = gzip.decompress(blob)
dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()") dataArray = eval(f"{type_name}Message_pb2.{type_name}Array()")
dataArray.ParseFromString(blob) dataArray.ParseFromString(blob)
data_dict_list = [ data_dict_list = [
@ -452,10 +452,10 @@ class DDBHFTLoader(DDBLoader):
df = pd.DataFrame(data_dict_list) df = pd.DataFrame(data_dict_list)
#df['code'] = make_symbol(df['code']) #df['code'] = make_symbol(df['code'])
df['code'] = stock_id df['code'] = stock_id
df['m_nDate'] = self.make_date(df['m_nDate']) df['m_nDate'] = DDBLoader.make_date(df['m_nDate'])
df['m_nTime'] = df['m_nDate'] + self.make_time(df['m_nTime']) df['m_nTime'] = df['m_nDate'] + DDBLoader.make_time(df['m_nTime'])
for field_name in array_type_list: for field_name in array_type_list:
df[field_name] = self.make_nparray(df[field_name]) df[field_name] = DDBLoader.make_nparray(df[field_name])
#print(f"Did create ddb table for dataframe of shape {df.shape}") #print(f"Did create ddb table for dataframe of shape {df.shape}")
# self.make_table_skeleton(type_name, df.shape[0]) # self.make_table_skeleton(type_name, df.shape[0])
@ -468,7 +468,7 @@ class DDBHFTLoader(DDBLoader):
用于做多进程录入ddb的函数 用于做多进程录入ddb的函数
""" """
df_table_name = type_name df_table_name = type_name
df = make_stock_daily_df(row[2], type_name, stock_id) df = DDBHFTLoader.make_stock_daily_df(row[2], type_name, stock_id)
ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848) ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848)
ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password']) ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password'])
@ -477,8 +477,8 @@ class DDBHFTLoader(DDBLoader):
# 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试 # 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
#ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( #ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
dbPath = DDBLoader.ddb_path, dbPath = DDBHFTLoader.ddb_path,
partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix, partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
df_table_name = df_table_name df_table_name = df_table_name
)) ))

Loading…
Cancel
Save