Attempted to use DBConnection Pool to reduce memory consumption, but it does not work well with the multi-processing architecture of Python.

main
Guofu Li 2 years ago
parent a34dee0175
commit bcb45b8f5f

@ -5,7 +5,7 @@ import dolphindb as ddb
class DDBBase(object):
ddb_config = {
'host' : '192.168.1.167',
'host' : '192.168.1.7',
'username' : 'admin',
'password' : '123456',
}

@ -94,8 +94,10 @@ class DDBHFTLoader(DDBLoader):
num_workers = 8
default_table_capacity = 10000
# TODO: 这里需要饮用自身文件的绝对路径,然后再寻找其父目录
ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv'
ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers)
def init_ddb_database(self, df_calendar):
"""
@ -432,8 +434,8 @@ class DDBHFTLoader(DDBLoader):
self.dump_journal_writer.flush()
@staticmethod
def make_stock_daily_df(blob, type_name, stock_id):
@classmethod
def make_stock_daily_df(cls, blob, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
@ -463,11 +465,17 @@ class DDBHFTLoader(DDBLoader):
#print(f"Did create ddb table for dataframe of shape {df.shape}")
# self.make_table_skeleton(type_name, df.shape[0])
del(blob)
del(dataArray)
del(data_dict_list)
del(array_type_list)
return df
@staticmethod
def dump_stock_daily_to_ddb(row, type_name, stock_id):
@classmethod
def dump_stock_daily_to_ddb(cls, row, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
@ -479,15 +487,24 @@ class DDBHFTLoader(DDBLoader):
ddb_sess.upload({df_table_name : df})
# 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
#ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
#ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
dbPath = DDBHFTLoader.ddb_path,
partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
df_table_name = df_table_name
))
#cls.ddb_sess_pool.runTaskAsyn("""
# tableInsert{{ loadTable("{dbPath}", "{partitioned_table_name}") }}
#""".format(
# dbPath = DDBHFTLoader.ddb_path,
# partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
#), df).result()
# 由于不是复用`DDBHFTLoader`对象内部的Session因此如果不手动关闭就会造成内存逐渐泄漏
ddb_sess.close()
del(df)
ddb_sess.undefAll()
ddb_sess.close()
del(ddb_sess)

@ -46,8 +46,8 @@ def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# create_hft_data()
create_pit_data()
create_hft_data()
#create_pit_data()
if __name__ == '__main__':

Loading…
Cancel
Save