From 621b38dce394d138bf2ec368b7e63b78037edbd8 Mon Sep 17 00:00:00 2001 From: Guofu Li Date: Tue, 16 Aug 2022 10:11:55 +0800 Subject: [PATCH] 1. Add `DDBDailyFactor.py`, which creates daily factors from HFT data. 2. Correct some typo in `DDBLoader.py`, but shouldn't affect its normal behaviour. --- DDBDailyFactor.py | 216 ++++++++++++ DDBLoader.py | 17 +- ddb.ipynb | 857 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1087 insertions(+), 3 deletions(-) create mode 100644 DDBDailyFactor.py create mode 100644 ddb.ipynb diff --git a/DDBDailyFactor.py b/DDBDailyFactor.py new file mode 100644 index 0000000..6c95703 --- /dev/null +++ b/DDBDailyFactor.py @@ -0,0 +1,216 @@ + +from pprint import pprint +from tqdm import tqdm +import functools + +import dolphindb as ddb + +from DDBLoader import DDBLoader + + + +def load_ddb_table(hft_tbl_name): + def decorator(func): + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + self.ddb_sess.run(""" + // 载入计算使用的原始数据:分钟线数据 + tbl = loadTable("{hft_ddb_path}", "{hft_tbl_name}"); + """.format( + hft_ddb_path = DDBLoader.ddb_path, + hft_tbl_name = hft_tbl_name, + )) + print('Did load', hft_tbl_name) + + return func(self) + return wrapper + return decorator + + +class DailyFactor(object): + + #ddb_hft_path = "dfs://hft_stock_ts" + #ddb_hft_dbname = "db_hft_stock" + ddb_daily_path = "dfs://daily_stock_ts" + ddb_daily_dbname = "db_daily_stock" + + ddb_config = { + 'host' : '192.168.1.167', + 'username' : 'admin', + 'password' : '123456', + } + + # 这里的partition数量未必需要和hft表的一致 + # 当读取hft表的时候,需要使用DDBLoader中的`num_code_partition`,而不是此字段 + num_code_partition = 50 + + def __init__(self): + self.ddb_sess = ddb.session(self.ddb_config['host'], 8848) + self.ddb_sess.login(self.ddb_config['username'], self.ddb_config['password']) + + + def create_ddb_database(self): + """ + 因为日频数据量较小,内部可以使用m_nDate作为sort_key,所以分区仅需要对stock_id做[HASH, 50]即可,因此不需要输入calendar数据 + """ + self.ddb_sess.run(""" + daily_stock_ts = database( + "{ddb_hft_path}", + HASH, [SYMBOL, {num_code_parition}], + engine = 'TSDB' + ) + """.format( + ddb_hft_path = DDBLoader.ddb_path, + num_code_partition = self.num_code_partition + )) + print('Did create database') + + + def load_ddb_database(self): + self.ddb_sess.run(""" + {dbName} = database( + directory = '{dbPath}', + partitionType = HASH, + partitionScheme = [SYMBOL, {num_code_partition}], + engine = 'TSDB' + ) + """.format( + dbName = self.ddb_daily_dbname, + dbPath = self.ddb_daily_path, + num_code_partition = self.num_code_partition + )) + print('Did load database.') + + + def append_factor_columns(self, factor_name_list, memory_tbl_name, partition_tbl_name): + code = """ + addColumn({partition_tbl_name}, {col_name_list}, {col_type_list}); + """.format( + partition_tbl_name = partition_tbl_name, + col_name_list = '`' + '`'.join(factor_name_list), + col_type_list = '[' + ','.join(['DOUBLE']*len(factor_name_list)) + ']' + ) + print('Will add columns via script:') + print(code) + self.ddb_sess.run(code) + + code = """ + {partition_tbl_name}.tableInsert({memory_tbl_name}) + """.format( + partition_tbl_name = partition_tbl_name, + memory_tbl_name = memory_tbl_name + ) + print('Will append date via script:') + print(code) + self.ddb_sess.run(code) + + + def append_to_partition_table(self, partition_tbl_name, memory_tbl_name): + self.ddb_sess.run(""" + {partition_tbl_name}.tableInsert({memory_tbl_name}) + """.format( + partition_tbl_name = partition_tbl_name, + memory_tbl_name = memory_tbl_name + )) + + + def create_factor_partition_table(self, partition_tbl_name, memory_tbl_name): + """ + 把数据从内存表(`memory_tbl_name`)添加到分区表(`partition_tbl_name`) + """ + # createPartitionedTable( + # dbHandle, table, tableName, + # [partitionColumns], [compressMethods], + # [sortColumns], [keepDuplicates=ALL], [sortKeyMappingFunction]) + code = """ + // 保证创建新的分区表不会和已经存在表冲突 + if (existsTable("{ddb_daily_path}", "{partition_tbl_name}")) {{ + dropTable({ddb_daily_dbname}, "{partition_tbl_name}"); + }} + + {partition_tbl_name} = createPartitionedTable( + dbHandle = {ddb_daily_dbname}, + table = {memory_tbl_name}, + tableName = "{partition_tbl_name}", + partitionColumns = 'code', + compressMethods = {{'m_nDate' : 'delta'}}, + sortColumns = `code`m_nDate + ); + """.format( + ddb_daily_path = self.ddb_daily_path, + ddb_daily_dbname = self.ddb_daily_dbname, + partition_tbl_name = partition_tbl_name, + memory_tbl_name = memory_tbl_name, + ) + print('Will create partitioned factor table via script:') + print(code) + self.ddb_sess.run(code) + + + @load_ddb_table("KLinePartitioned") + def make_kurto_memory_table(self): + + memory_table_name = "kurto" + code_tpl = """ + // 需要首先创建分钟线收益表 + // 使用`context by`使得计算结果仍然为一个序列 + // 使用`where partition()`来逐个加载分区 + ret_sql = select + code, m_nDate, eachPre(\, m_nClose)-1.0 as ret + from tbl + where partition(code, {partition_id}) + context by m_nDate; + + // 计算kurto指标,`ret`表中每日第一条记录为空,似乎并不造成影响 + kurto_sql = select + code, m_nDate, sqrt(239) * sum(pow(ret, 3)) / pow(sum(pow(ret, 2)), 1.5) as kurto + from ret_sql + group by code, m_nDate; + """ + with tqdm(range(DDBLoader.num_code_partition)) as pbar: + #with tqdm(range(1)) as pbar: + for partition_id in pbar: + self.ddb_sess.run(code_tpl.format( + partition_id = partition_id, + )) + + # 因为原表有50个分区,需要逐个计算,因此先创建一个内存临时表 + # 否则一旦第一个分区插入分区表后,就无法再插入后续只包含部分字段的数据了 + if partition_id == 0: + self.ddb_sess.run(""" + {memory_table_name} = table(kurto_sql) + """.format( + memory_table_name = memory_table_name + )) + + # 上面`table`语句仅仅是创建表结构, + # 然后使用`tableInsert`把真实数据插入进去 + self.ddb_sess.run(""" + {memory_table_name}.tableInsert(kurto_sql) + """.format( + memory_table_name = memory_table_name + )) + print('Did finish all parititons for kurto.') + pprint(self.ddb_sess.run(f"{memory_table_name}")) + + return memory_table_name + + +def main(): + factor = DailyFactor() + factor.load_ddb_database() + + memory_table_name = factor.make_kurto_memory_table() + factor.create_factor_partition_table( + 'hft_daily_factor', + memory_table_name + ) + factor.append_to_partition_table( + 'hft_daily_factor', + memory_table_name + ) + + +if __name__ == '__main__': + main() + diff --git a/DDBLoader.py b/DDBLoader.py index 0a90df3..e5d8f0e 100644 --- a/DDBLoader.py +++ b/DDBLoader.py @@ -21,6 +21,9 @@ import ProtoBuffEntitys def make_stock_daily_df(blob, type_name, stock_id): + """ + 用于做多进程录入ddb的函数 + """ blob = gzip.decompress(blob) dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()") dataArray.ParseFromString(blob) @@ -51,6 +54,9 @@ def make_stock_daily_df(blob, type_name, stock_id): def dump_stock_daily_to_ddb(row, type_name, stock_id): + """ + 用于做多进程录入ddb的函数 + """ df_table_name = type_name df = make_stock_daily_df(row[2], type_name, stock_id) @@ -168,6 +174,9 @@ class DDBLoader(object): 'password' : '123456' } + # this value may be used by factor makers, which may loop through code partitions + num_code_partition = 50 + num_workers = 8 default_table_capacity = 10000 ddb_dump_journal_fname = 'ddb_dump_journal.csv' @@ -258,8 +267,10 @@ class DDBLoader(object): # 这里看起来直接使用dolphindb的脚本语句更方便一些 self.ddb_sess.run(""" - db_stock = database("", 5, [SYMBOL, 50]) - """) + db_stock = database("", 5, [SYMBOL, {num_code_partition}]) + """.format( + num_code_partition = self.num_code_parition + )) #self.ddb_sess.run(""" # db_stock = database("", 1, symbol({partitions})) #""".format( @@ -346,7 +357,7 @@ class DDBLoader(object): dbHandle={ddb_dbname}, table={mem_table}, tableName=`{per_table}, - sortCOlumns=`code`m_nDate, + sortColumns=`code`m_nDate, compressMethods={{"m_nDate":"delta"}} ), {mem_table}) """.format( diff --git a/ddb.ipynb b/ddb.ipynb new file mode 100644 index 0000000..5cea5a3 --- /dev/null +++ b/ddb.ipynb @@ -0,0 +1,857 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "7246e0c8-61cd-4cbf-a978-aa0dc0172d6d", + "metadata": {}, + "outputs": [], + "source": [ + "import dolphindb as ddb" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "5d0f471e-682e-43cc-abdb-7e52f3bbd707", + "metadata": {}, + "outputs": [], + "source": [ + "sess = ddb.session('192.168.1.7', 8848)\n", + "sess.login('admin', '123456')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "ed17fd0b-9b36-47e4-9ab6-11459a3621fb", + "metadata": {}, + "outputs": [], + "source": [ + "# backup(backup_path, sql_obj, force, parallel)\n", + "code = \"\"\"\n", + " backup('/data/dolphindb/backup/',