diff --git a/src/DDBBase.py b/src/DDBBase.py index 368fb31..6252454 100644 --- a/src/DDBBase.py +++ b/src/DDBBase.py @@ -5,7 +5,7 @@ import dolphindb as ddb class DDBBase(object): ddb_config = { - 'host' : '192.168.1.167', + 'host' : '192.168.1.7', 'username' : 'admin', 'password' : '123456', } diff --git a/src/loader/DDBHFTLoader.py b/src/loader/DDBHFTLoader.py index cbd77ce..5360ea4 100644 --- a/src/loader/DDBHFTLoader.py +++ b/src/loader/DDBHFTLoader.py @@ -94,8 +94,10 @@ class DDBHFTLoader(DDBLoader): num_workers = 8 default_table_capacity = 10000 + # TODO: 这里需要饮用自身文件的绝对路径,然后再寻找其父目录 ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv' + ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers) def init_ddb_database(self, df_calendar): """ @@ -432,8 +434,8 @@ class DDBHFTLoader(DDBLoader): self.dump_journal_writer.flush() - @staticmethod - def make_stock_daily_df(blob, type_name, stock_id): + @classmethod + def make_stock_daily_df(cls, blob, type_name, stock_id): """ 用于做多进程录入ddb的函数 """ @@ -463,11 +465,17 @@ class DDBHFTLoader(DDBLoader): #print(f"Did create ddb table for dataframe of shape {df.shape}") # self.make_table_skeleton(type_name, df.shape[0]) + + del(blob) + del(dataArray) + del(data_dict_list) + del(array_type_list) + return df - @staticmethod - def dump_stock_daily_to_ddb(row, type_name, stock_id): + @classmethod + def dump_stock_daily_to_ddb(cls, row, type_name, stock_id): """ 用于做多进程录入ddb的函数 """ @@ -479,15 +487,24 @@ class DDBHFTLoader(DDBLoader): ddb_sess.upload({df_table_name : df}) # 因为在做Tick数据的时候,偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试 - #ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( - ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( + ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( + #ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( dbPath = DDBHFTLoader.ddb_path, partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix, df_table_name = df_table_name )) + #cls.ddb_sess_pool.runTaskAsyn(""" + # tableInsert{{ loadTable("{dbPath}", "{partitioned_table_name}") }} + #""".format( + # dbPath = DDBHFTLoader.ddb_path, + # partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix, + #), df).result() + # 由于不是复用`DDBHFTLoader`对象内部的Session,因此如果不手动关闭就会造成内存逐渐泄漏 - ddb_sess.close() del(df) + ddb_sess.undefAll() + ddb_sess.close() + del(ddb_sess) diff --git a/src/run.py b/src/run.py index fa8c8a5..19aec21 100644 --- a/src/run.py +++ b/src/run.py @@ -46,8 +46,8 @@ def main(): # TODO: # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 - # create_hft_data() - create_pit_data() + create_hft_data() + #create_pit_data() if __name__ == '__main__':