From bcb45b8f5fe679cf65e751c2d9456bbc9650d7aa Mon Sep 17 00:00:00 2001
From: Guofu Li
Date: Sat, 27 Aug 2022 14:26:53 +0800
Subject: [PATCH] Attempted to use DBConnection Pool to reduce memory
consumption, but it does not work well with the multi-processing architecture
of Python.
---
src/DDBBase.py | 2 +-
src/loader/DDBHFTLoader.py | 31 ++++++++++++++++++++++++-------
src/run.py | 4 ++--
3 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/src/DDBBase.py b/src/DDBBase.py
index 368fb31..6252454 100644
--- a/src/DDBBase.py
+++ b/src/DDBBase.py
@@ -5,7 +5,7 @@ import dolphindb as ddb
class DDBBase(object):
ddb_config = {
- 'host' : '192.168.1.167',
+ 'host' : '192.168.1.7',
'username' : 'admin',
'password' : '123456',
}
diff --git a/src/loader/DDBHFTLoader.py b/src/loader/DDBHFTLoader.py
index cbd77ce..5360ea4 100644
--- a/src/loader/DDBHFTLoader.py
+++ b/src/loader/DDBHFTLoader.py
@@ -94,8 +94,10 @@ class DDBHFTLoader(DDBLoader):
num_workers = 8
default_table_capacity = 10000
+ # TODO: 这里需要饮用自身文件的绝对路径,然后再寻找其父目录
ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv'
+ ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers)
def init_ddb_database(self, df_calendar):
"""
@@ -432,8 +434,8 @@ class DDBHFTLoader(DDBLoader):
self.dump_journal_writer.flush()
- @staticmethod
- def make_stock_daily_df(blob, type_name, stock_id):
+ @classmethod
+ def make_stock_daily_df(cls, blob, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
@@ -463,11 +465,17 @@ class DDBHFTLoader(DDBLoader):
#print(f"Did create ddb table for dataframe of shape {df.shape}")
# self.make_table_skeleton(type_name, df.shape[0])
+
+ del(blob)
+ del(dataArray)
+ del(data_dict_list)
+ del(array_type_list)
+
return df
- @staticmethod
- def dump_stock_daily_to_ddb(row, type_name, stock_id):
+ @classmethod
+ def dump_stock_daily_to_ddb(cls, row, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
@@ -479,15 +487,24 @@ class DDBHFTLoader(DDBLoader):
ddb_sess.upload({df_table_name : df})
# 因为在做Tick数据的时候,偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
- #ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
- ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
+ ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
+ #ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
dbPath = DDBHFTLoader.ddb_path,
partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
df_table_name = df_table_name
))
+ #cls.ddb_sess_pool.runTaskAsyn("""
+ # tableInsert{{ loadTable("{dbPath}", "{partitioned_table_name}") }}
+ #""".format(
+ # dbPath = DDBHFTLoader.ddb_path,
+ # partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
+ #), df).result()
+
# 由于不是复用`DDBHFTLoader`对象内部的Session,因此如果不手动关闭就会造成内存逐渐泄漏
- ddb_sess.close()
del(df)
+ ddb_sess.undefAll()
+ ddb_sess.close()
+ del(ddb_sess)
diff --git a/src/run.py b/src/run.py
index fa8c8a5..19aec21 100644
--- a/src/run.py
+++ b/src/run.py
@@ -46,8 +46,8 @@ def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
- # create_hft_data()
- create_pit_data()
+ create_hft_data()
+ #create_pit_data()
if __name__ == '__main__':