From 95de99046afe6dd44b004c5379d36b22bc4bc2f7 Mon Sep 17 00:00:00 2001
From: Guofu Li
Date: Thu, 18 Aug 2022 14:33:47 +0800
Subject: [PATCH] =?UTF-8?q?1.=20`DDBDailyFactor.py`=20=3D>=20`DDBFactor.py?=
=?UTF-8?q?`=202.=20=E5=9C=A8`DDBLoader.py`=E4=B8=AD=E5=A2=9E=E5=8A=A0?=
=?UTF-8?q?=E4=BA=86`DDBPITLoader`=E7=B1=BB=EF=BC=8C=E4=B8=93=E9=97=A8?=
=?UTF-8?q?=E7=94=A8=E4=BA=8E=E5=90=91DolphinDB=E9=87=8C=E9=9D=A2=E6=B7=BB?=
=?UTF-8?q?=E5=8A=A0PIT=E6=95=B0=E6=8D=AE=203.=20=E5=85=B6=E4=BB=96?=
=?UTF-8?q?=E4=B8=80=E4=BA=9B=E4=BC=98=E5=8C=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
DDBDailyFactor.py => DDBFactor.py | 3 +
DDBLoader.py | 599 ++++++++++++---
ddb.ipynb | 33 +-
mssql.ipynb | 1151 ++++++++++++++++++++++++++++-
4 files changed, 1692 insertions(+), 94 deletions(-)
rename DDBDailyFactor.py => DDBFactor.py (97%)
diff --git a/DDBDailyFactor.py b/DDBFactor.py
similarity index 97%
rename from DDBDailyFactor.py
rename to DDBFactor.py
index 6c95703..c0470cb 100644
--- a/DDBDailyFactor.py
+++ b/DDBFactor.py
@@ -10,6 +10,9 @@ from DDBLoader import DDBLoader
def load_ddb_table(hft_tbl_name):
+ """
+ 这是一个用来简化载入分区表过程的语法糖,但似乎需要预先调用这个函数的场景并不多,简化效果不是很明显。
+ """
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
diff --git a/DDBLoader.py b/DDBLoader.py
index e5d8f0e..e26c465 100644
--- a/DDBLoader.py
+++ b/DDBLoader.py
@@ -2,6 +2,8 @@ import importlib
import gzip
import pickle
import functools
+import abc
+import warnings
from pprint import pprint
from pathlib import Path
@@ -11,6 +13,8 @@ from multiprocessing import Pool
import numpy as np
import pandas as pd
+from pandas.core.common import SettingWithCopyWarning
+warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
import dolphindb as ddb
import dolphindb.settings as keys
@@ -20,84 +24,465 @@ import sqlalchemy as sa
import ProtoBuffEntitys
-def make_stock_daily_df(blob, type_name, stock_id):
+
+class DDBLoader(object):
"""
- 用于做多进程录入ddb的函数
+ - 放了几个公用的配置字段,包括:
+ 1. SQL-Server的链接参数
+ 2. DolphinDB的链接参数
+
+ - 放了几个@abstractmethod在里面,不过如果不需要使用多态特性,那应该用处不大:
+ 1. create_ddb_database
+ 2. create_ddb_partition_table
"""
- blob = gzip.decompress(blob)
- dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()")
- dataArray.ParseFromString(blob)
- data_dict_list = [
- {field.name : val for field, val in entry.ListFields()}
- for entry in dataArray.dataArray
+ mssql_config = {
+ 'host' : '192.168.1.7',
+ 'username' : 'sa',
+ 'password' : 'passw0rd!'
+ }
+
+ ddb_config = {
+ 'host' : '192.168.1.167',
+ 'username' : 'admin',
+ 'password' : '123456'
+ }
+
+
+ def __init__(self):
+ self.mssql_engine = sa.create_engine(
+ "mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
+ connect_args = {
+ "TrustServerCertificate": "yes"
+ }, echo=False
+ )
+
+ self.ddb_sess = ddb.session(self.ddb_config['host'], 8848)
+ self.ddb_sess.login(self.ddb_config['username'], self.ddb_config['password'])
+
+
+ @abc.abstractmethod
+ def create_ddb_database(self, *args, **kwargs):
+ """
+ 创建database函数,需要被子类具体实现。
+ """
+ return
+
+ @abc.abstractmethod
+ def create_ddb_partition_table(self, *args, **kwargs):
+ """
+ 创建分区表函数,需要被子类具体实现。
+ """
+ return
+
+
+ @staticmethod
+ def tscode_to_windcode(series):
+ return series.apply(lambda x : x[2:] + '.' + x[:2])
+
+
+ @staticmethod
+ def make_symbol(series):
+ return series.astype('int32').astype('str')\
+ .apply(str.zfill, args=(6,))\
+ .apply(lambda code : \
+ code + '.SH' if code[0] == '6' \
+ else code + '.SZ')
+
+
+ @staticmethod
+ def make_date(series):
+ # 特别是对于分红表,如果某些关键日期还未公布,则会填充0,导致日期解析失败
+ series.loc[series == 0] = np.nan
+ return pd.to_datetime(
+ series.astype(str), format='%Y%m%d')
+
+
+ @staticmethod
+ def make_nparray(series):
+ return series.apply(lambda x : np.array(x))
+
+
+ @staticmethod
+ def make_time(series):
+ s_hr = series // 10000000 * 3600000
+ s_min = series % 10000000 // 100000 * 60000
+ s_sec = series % 100000 // 1000
+ s_ms = series % 1000
+ return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms')
+
+
+
+class DDBPITLoader(DDBLoader):
+
+ ddb_path = "dfs://pit_stock_ts"
+ ddb_dbname = "ddb_pit_stock_ts"
+
+ num_code_partition = 50
+
+ table_name_mapping = {
+ #'CBS_AFTER_ADJ' : 'bs_common_adj',
+ #'CBS_BEFORE_ADJ' : 'bs_common_ori',
+ #'CCFS_AFTER_ADJ' : 'cfs_common_adj',
+ #'CCFS_BEFORE_ADJ' : 'cfs_common_ori',
+ #'CIS_AFTER_ADJ' : 'is_common_adj',
+ #'CIS_BEFORE_ADJ' : 'is_common_ori',
+ 'DIV_WIND' : 'divident',
+ #'EP_WIND' : 'earnings_preannouncement',
+ #'PEE_WIND' : 'preliminary_earnings_estimate'
+ }
+
+ meta_col_config = {
+ 'WIND_CODE' : ('code', 'SYMBOL'),
+ # mssql表中不需要记录的meta字段,在这里直接设置为None
+ 'IntCode' : None,
+ 'ACTUAL_ANN_DT' : None,
+ 'ReportPeriod' : ('report_period', 'DATE'),
+ 'AppearInPeriod' : ('appear_in_period', 'DATE'),
+ 'AppearAtDate' : ('appear_at_date', 'DATE')
+ }
+
+ date_col_set = {
+ 'report_period',
+ 'appear_in_period',
+ 'appear_at_date',
+ 'ReportPeriod',
+ 'AppearInPeriod',
+ 'AppearAtDate',
+ 'EQY_RECORD_DT',
+ 'EX_DT',
+ 'DVD_PAYOUT_DT',
+ 'S_DIV_PRELANDATE',
+ 'S_DIV_SMTGDATE',
+ 'DVD_ANN_DT',
+ 'S_DIV_PREANNDT'
+ }
+
+ ddb_type_mapping = {
+ 'float' : 'DOUBLE',
+ 'int' : 'INT',
+ 'text' : 'STRING',
+ 'varchar' : 'STRING',
+ 'str' : 'STRING'
+ }
+
+ # 基本面数据库现在存放在91服务器之上
+ mssql_config = {
+ 'host' : '192.168.1.91',
+ 'username' : 'sa',
+ 'password' : 'xn.123',
+ 'dbname' : 'tr_statement'
+ }
+
+
+ def __init__(self):
+ super().__init__()
+ # 重新设定mssql_engine对象,此时我们需要使用基本面数据库
+ self.mssql_engine = sa.create_engine(
+ "mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
+ connect_args = {
+ "TrustServerCertificate": "yes"
+ }, echo=False
+ )
+
+
+ def create_ddb_database(self):
+ self.ddb_sess.run("""
+ {dbName} = database(
+ directory = '{dbPath}',
+ partitionType = HASH,
+ partitionScheme = [SYMBOL, {num_code_partition}],
+ engine = 'TSDB'
+ )
+ """.format(
+ dbName = self.ddb_dbname,
+ dbPath = self.ddb_path,
+ num_code_partition = self.num_code_partition
+ ))
+
+
+ def _make_col_config(self, mssql_table_name):
+ """
+ Return:
+ mssql_col_name_list, ddb_col_name_list, ddb_col_type_list
+ """
+ with self.mssql_engine.connect() as conn:
+ col_sp_list = list(conn.execute(f"exec sp_columns {mssql_table_name}").fetchall())
+
+ mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \
+ [], [], []
+
+ for col_sp in col_sp_list:
+ _col_name = col_sp[3]
+ _col_type = col_sp[5]
+
+ # 对于meta字段,需要根据meta配置表来进行处理
+ if _col_name in self.meta_col_config:
+ # 跳过mssql表中 不需要记录的meta字段
+ if self.meta_col_config[_col_name] is None:
+ continue
+ # 字段名和字段类型都要进行映射
+ mssql_col_name_list.append(_col_name)
+ ddb_col_name_list.append(self.meta_col_config[_col_name][0])
+ ddb_col_type_list.append(self.meta_col_config[_col_name][1])
+ # 对于非meta字段,仅需要检查是否是float类型,对于float类型设置类型为DOUBLE
+ else:
+ # 需要之后被转换成DATE的字段,一般在原表中为为INT类型
+ if _col_name in self.date_col_set:
+ mssql_col_name_list.append(_col_name)
+ ddb_col_name_list.append(_col_name)
+ ddb_col_type_list.append('DATE')
+ # 按照对照表进行类型转换
+ elif _col_type in self.ddb_type_mapping:
+ mssql_col_name_list.append(_col_name)
+ ddb_col_name_list.append(_col_name)
+ ddb_col_type_list.append(self.ddb_type_mapping[_col_type])
+ # 对照表中没有的字段类型,就不加入到字段列表中了
+ else:
+ print(f"!**Unrecognized type '{_col_type}' for column {_col_name}, will skip.")
+
+ return mssql_col_name_list, ddb_col_name_list, ddb_col_type_list
+
+
+ def create_ddb_partition_table(self, mssql_table_name):
+ """创建分区表"""
+ memory_table_name = mssql_table_name
+ partition_table_name = self.table_name_mapping[mssql_table_name]
+
+ mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \
+ self._make_col_config(mssql_table_name)
+
+ # 根据是否
+ if 'appear_in_period' in ddb_col_name_list:
+ compress_methods = """{
+ 'report_period' : 'delta',
+ 'appear_in_period' : 'delta',
+ 'appear_at_date' : 'delta'
+ }"""
+ else:
+ compress_methods = """{
+ 'report_period' : 'delta',
+ 'appear_at_date' : 'delta'
+ }"""
+
+ # 因为已经根据`appear_in_period`分列了调整前和调整后,因此不需要对它再进行排序了
+ sort_columns = "`code`report_period`appear_at_date"
+
+ # 1. 先创建内存表,内存表中设定好列名和列类型
+ # 2. 然后根据内存表创建分区表,设定分区列等信息
+ self.ddb_sess.run("""
+ {memory_table_name} = table(
+ {capacity}:0,
+ {column_name_list},
+ [{column_type_list}]
+ );
+
+ if (existsTable("{ddb_path}", "{partition_table_name}")) {{
+ dropTable({ddb_dbname}, "{partition_table_name}");
+ }}
+
+ {partition_table_name} = createPartitionedTable(
+ dbHandle = {ddb_dbname},
+ table = {memory_table_name},
+ tableName = "{partition_table_name}",
+ partitionColumns = 'code',
+ compressMethods = {compress_methods},
+ sortColumns = {sort_columns}
+ );
+ """.format(
+ ddb_dbname = self.ddb_dbname,
+ ddb_path = self.ddb_path,
+ memory_table_name = memory_table_name,
+ partition_table_name = partition_table_name,
+ capacity = 10,
+ column_name_list = '`' + '`'.join(ddb_col_name_list),
+ column_type_list = ','.join(ddb_col_type_list),
+ compress_methods = compress_methods.replace('\n', '').replace(' ', ''),
+ sort_columns = sort_columns
+ ))
+ print('-' * 80)
+ print(f"Did create parition table <{partition_table_name}>:")
+ pprint(self.ddb_sess.run(f"schema({partition_table_name});"))
+ return partition_table_name, mssql_col_name_list
+
+
+ def create_ddb_partition_tables(self):
+ for mssql_table_name in self.table_name_mapping:
+ self.create_ddb_partition_table(mssql_table_name)
+
+
+ def _dump_pit_to_ddb(self, mssql_table_name):
+ print('Will work on table', mssql_table_name)
+ # 返回的`mssql_col_name_list`可以用来对SQL-Server获取的dataframe进行列过滤
+ partition_table_name, mssql_col_name_list = \
+ self.create_ddb_partition_table(mssql_table_name)
+
+ with self.mssql_engine.connect() as conn:
+ stat = f"select distinct [WIND_CODE] from {mssql_table_name}"
+ stock_id_list = list(conn.execute(stat).fetchall())
+
+ with tqdm(stock_id_list) as pbar:
+ for (stock_id,) in pbar:
+ pbar.set_description(f"Will work on {stock_id}")
+ #pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server")
+ stat = """
+ select * from {mssql_table_name}
+ where WIND_CODE='{stock_id}' and AppearAtDate>0
+ """.format(
+ mssql_table_name = mssql_table_name,
+ stock_id = stock_id
+ )
+ row_list = list(conn.execute(stat).fetchall())
+ num_rows = len(row_list)
+
+ # 因为对AppearAtDate做了过滤,所以有可能得到一个空的数据集
+ if num_rows == 0:
+ print(f"Will skip {stock_id} due to empty result.")
+ continue
+
+ #pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}")
+ # 这里需要对select语句获取的所有列进行一次过滤,以保证和partition table中的列一致
+ df = pd.DataFrame(row_list)[mssql_col_name_list]
+ # 需要把部分字段的int字段类型转换成DATE字段类型
+ for df_col in df.columns:
+ if df_col in self.date_col_set:
+ df[df_col] = DDBLoader.make_date(df[df_col])
+ # 因为在做数据库View的时候已经做过一轮转换了,所以这里就不需要再次转换了
+ #df['WIND_CODE'] = DDBLoader.tscode_to_windcode(df['WIND_CODE'])
+
+ self.ddb_sess.upload({mssql_table_name : df})
+ self.ddb_sess.run(f"{partition_table_name}.tableInsert({mssql_table_name})")
+
+
+ def dump_pit_to_ddb(self):
+ for mssql_table_name in self.table_name_mapping:
+ self._dump_pit_to_ddb(mssql_table_name)
+
+
+class DDBDailyLoader(DDBLoader):
+
+ ddb_path = "dfs://daily_stock_ts"
+ ddb_dbname = "db_daily_stock_ts"
+
+ daily_kline_cols = [
+ 'code', 'm_nDate',
+ 'open', 'high', 'low', 'close', 'vol',
+ 'amount', 'cjbs', 'yclose',
+ 'PctChg', 'IsZt', 'IsDt', 'IsST', 'IsGoDelist',
+ 'FloatShares', 'MarketValues',
+ 'factor'
]
- array_type_list = [
- field.name
- for field, val in dataArray.dataArray[0].ListFields()
- if isinstance(field.default_value, list)
+ daily_kline_col_types = [
+ 'SYMBOL', 'DATE',
+ 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE',
+ 'DOUBLE', 'INT', 'DOUBLE',
+ 'DOUBLE', 'INT', 'INT', 'INT', 'INT',
+ 'DOUBLE', 'DOUBLE',
+ 'DOUBLE'
]
- #pprint(array_type_list)
- df = pd.DataFrame(data_dict_list)
- #df['code'] = make_symbol(df['code'])
- df['code'] = stock_id
- df['m_nDate'] = make_date(df['m_nDate'])
- df['m_nTime'] = df['m_nDate'] + make_time(df['m_nTime'])
- for field_name in array_type_list:
- df[field_name] = make_nparray(df[field_name])
- #print(f"Did create ddb table for dataframe of shape {df.shape}")
- # self.make_table_skeleton(type_name, df.shape[0])
- return df
+ def create_ddb_database(self):
+ # TODO: daily数据库已经在DDBDailyFactor中被创建了
+ # 后续可以迁移过来,不过现在就暂时先不管了
+ pass
-def dump_stock_daily_to_ddb(row, type_name, stock_id):
- """
- 用于做多进程录入ddb的函数
- """
- df_table_name = type_name
- df = make_stock_daily_df(row[2], type_name, stock_id)
+ def load_ddb_database(self):
+ self.ddb_sess.run("""
+ {dbName} = database(directory='{dbPath}')
+ """.format(
+ dbName = self.ddb_dbname,
+ dbPath = self.ddb_path
+ ))
+ print('Did load database from', self.ddb_path)
- ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848)
- ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password'])
- ddb_sess.upload({df_table_name : df})
- ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
- dbPath = DDBLoader.ddb_path,
- partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix,
- df_table_name = df_table_name
- ))
+ def create_ddb_partition_table(self, memory_table_name, partition_table_name):
+ # TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来
+ # 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可
+ # 搬迁数据的时候需要考虑按照逐个股票来搬迁,以免造成对内存的巨大压力
+ self.ddb_sess.run("""
+ // 确保删除原表
+ if (existsTable("{ddb_daily_path}", "{partition_table_name}")) {{
+ dropTable({ddb_daily_dbname}, "{partition_table_name}");
+ }}
+
+ // 然后根据内存表的结构,创建持久化的分区表
+ {partition_table_name} = {ddb_daily_dbname}.createPartitionedTable(
+ table = {memory_table_name},
+ tableName = "{partition_table_name}",
+ partitionColumns = `code,
+ sortColumns = `code`m_nDate,
+ compressMethods = {{m_nDate:"delta"}}
+ );
+ """.format(
+ ddb_daily_path = self.ddb_path,
+ ddb_daily_dbname = self.ddb_dbname,
+ memory_table_name = memory_table_name,
+ partition_table_name = partition_table_name,
+ ))
+
+ def create_ddb_memory_table(self, memory_table_name, capacity):
+ self.ddb_sess.run("""
+ // 先创建一个空的内存表用来表征结构,如果无需插入数据,capacity可以设为10
+ {memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]);
+ """.format(
+ memory_table_name = memory_table_name,
+ capacity = capacity,
+ col_names = '`' + '`'.join(self.daily_kline_cols),
+ col_types = ', '.join(self.daily_kline_col_types)
+ ))
-def make_symbol(series):
- return series.astype('int32').astype('str')\
- .apply(str.zfill, args=(6,))\
- .apply(lambda code : \
- code + '.SH' if code[0] == '6' \
- else code + '.SZ')
+ def dump_daily_kline_to_ddb(self):
+ # 先创建一个分区表,然后再逐个股票的数据插入
+ # 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦
+ # 2. python程序中的dataframe直接上传到dolphindb内存表,不需要考虑内存表字段类型,分区表中设置好即可
-def make_date(series):
- return pd.to_datetime(
- series.astype(str), format='%Y%m%d')
+ memory_table_name = 'daily_kline_mem'
+ partition_table_name = 'daily_kline'
+ self.create_ddb_memory_table(memory_table_name, 10)
+ print('Did create ddb memory table.')
+ pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
+ self.create_ddb_partition_table(memory_table_name, partition_table_name)
+ print('Did create ddb partition table.')
+ pprint(self.ddb_sess.run(f"schema({partition_table_name})"))
-def make_nparray(series):
- return series.apply(lambda x : np.array(x))
+ with self.mssql_engine.connect() as conn:
+ stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]"
+ stock_id_list = list(conn.execute(stat).fetchall())
+
+ with tqdm(stock_id_list) as pbar:
+ for (stock_id,) in pbar:
+ pbar.set_description(f"Will work on {stock_id}")
+ #pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server")
+ stat = """
+ select * from [StockDaily].dbo.[DailyKLine]
+ where StockID='{stock_id}'
+ """.format(
+ stock_id = stock_id
+ )
+ row_list = list(conn.execute(stat).fetchall())
+ num_rows = len(row_list)
+ #pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}")
+ df = pd.DataFrame(row_list)
+ df['date'] = DDBLoader.make_date(df['date'])
+ df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID'])
+ self.ddb_sess.upload({memory_table_name : df})
+ #print('Did upload dataframe to ddb.')
+ #pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
+ #break
+ self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})")
-def make_time(series):
- s_hr = series // 10000000 * 3600000
- s_min = series % 10000000 // 100000 * 60000
- s_sec = series % 100000 // 1000
- s_ms = series % 1000
- return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms')
-class DDBLoader(object):
+class DDBHFTLoader(DDBLoader):
"""
0. 从sql-server中读取calendar数据,并创建成员变量df_calendar,df_calendar可以保存在本地pickle作为缓存
|- `def make_calendar_df(self) -> df_calendar`
@@ -162,18 +547,6 @@ class DDBLoader(object):
13 : 'INT',
}
- mssql_config = {
- 'host' : '192.168.1.7',
- 'username' : 'sa',
- 'password' : 'passw0rd!'
- }
-
- ddb_config = {
- 'host' : '192.168.1.7',
- 'username' : 'admin',
- 'password' : '123456'
- }
-
# this value may be used by factor makers, which may loop through code partitions
num_code_partition = 50
@@ -182,18 +555,6 @@ class DDBLoader(object):
ddb_dump_journal_fname = 'ddb_dump_journal.csv'
- def __init__(self):
- self.mssql_engine = sa.create_engine(
- "mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
- connect_args = {
- "TrustServerCertificate": "yes"
- }, echo=False
- )
-
- self.ddb_sess = ddb.session(self.ddb_config['host'], 8848)
- self.ddb_sess.login(self.ddb_config['username'], self.ddb_config['password'])
-
-
def init_ddb_database(self, df_calendar):
"""
1. 创建ddb_database
@@ -490,6 +851,7 @@ class DDBLoader(object):
print("Will create new Pool object, but this is not encourage for large batch work.")
pool = Pool(self.num_worker)
+ # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm(total=num_rows, leave=False) as sub_pbar:
for _ in pool.imap_unordered(
functools.partial(
@@ -505,18 +867,85 @@ class DDBLoader(object):
self.dump_journal_writer.flush()
+ @staticmethod
+ def make_stock_daily_df(blob, type_name, stock_id):
+ """
+ 用于做多进程录入ddb的函数
+ """
+ blob = gzip.decompress(blob)
+ dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()")
+ dataArray.ParseFromString(blob)
+
+ data_dict_list = [
+ {field.name : val for field, val in entry.ListFields()}
+ for entry in dataArray.dataArray
+ ]
+
+ array_type_list = [
+ field.name
+ for field, val in dataArray.dataArray[0].ListFields()
+ if isinstance(field.default_value, list)
+ ]
+ #pprint(array_type_list)
+
+ df = pd.DataFrame(data_dict_list)
+ #df['code'] = make_symbol(df['code'])
+ df['code'] = stock_id
+ df['m_nDate'] = make_date(df['m_nDate'])
+ df['m_nTime'] = df['m_nDate'] + make_time(df['m_nTime'])
+ for field_name in array_type_list:
+ df[field_name] = make_nparray(df[field_name])
+
+ #print(f"Did create ddb table for dataframe of shape {df.shape}")
+ # self.make_table_skeleton(type_name, df.shape[0])
+ return df
+
+
+ @staticmethod
+ def dump_stock_daily_to_ddb(row, type_name, stock_id):
+ """
+ 用于做多进程录入ddb的函数
+ """
+ df_table_name = type_name
+ df = make_stock_daily_df(row[2], type_name, stock_id)
+
+ ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848)
+ ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password'])
+
+ ddb_sess.upload({df_table_name : df})
+ ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
+ dbPath = DDBLoader.ddb_path,
+ partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix,
+ df_table_name = df_table_name
+ ))
+
+
+
def main():
- loader = DDBLoader()
- df_calendar = loader.make_calendar_df()
- loader.init_ddb_database(df_calendar)
- print('Did finish init_ddb_database')
+ # PIT基本面数据
+ loader = DDBPITLoader()
+ loader.create_ddb_database()
+ #loader.create_ddb_partition_tables()
+ loader.dump_pit_to_ddb()
+
+ # 日频行情数据
+ #loader = DDBDailyLoader()
+ #loader.load_ddb_database()
+ #loader.dump_daily_kline_to_ddb()
+
+
+ # 高频数据
+ #df_calendar = loader.make_calendar_df()
+
+ #loader.init_ddb_database(df_calendar)
+ #print('Did finish init_ddb_database')
#loader.load_ddb_database()
#print('Did load ddb database')
- loader.init_ddb_table_data(df_calendar)
- print('Did finish init_table_data')
+ #loader.init_ddb_table_data(df_calendar)
+ #print('Did finish init_table_data')
if __name__ == '__main__':
diff --git a/ddb.ipynb b/ddb.ipynb
index 5cea5a3..95ce37c 100644
--- a/ddb.ipynb
+++ b/ddb.ipynb
@@ -12,7 +12,7 @@
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": 66,
"id": "5d0f471e-682e-43cc-abdb-7e52f3bbd707",
"metadata": {},
"outputs": [],
@@ -23,30 +23,30 @@
},
{
"cell_type": "code",
- "execution_count": 7,
+ "execution_count": 67,
"id": "ed17fd0b-9b36-47e4-9ab6-11459a3621fb",
"metadata": {},
"outputs": [],
"source": [
"# backup(backup_path, sql_obj, force, parallel)\n",
"code = \"\"\"\n",
- " backup('/data/dolphindb/backup/',