main
Guofu Li 2 years ago
parent 083c66abb1
commit a64a112b46

1
.gitignore vendored

@ -5,6 +5,7 @@
.DS_Store .DS_Store
*.DS_Store *.DS_Store
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -28,16 +28,118 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 5, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"with engine.connect() as conn:\n", "with engine.connect() as conn:\n",
" stat = \"select [StockID], [date] from [StockDaily].[dbo].[DailyKLine] group by [StockID], [date]\"\n", " stat = \"select [StockID], [date] from [IndexDaily].[dbo].[DailyKLine] group by [StockID], [date]\"\n",
" rs = conn.execute(statd)\n", " rs = conn.execute(stat)\n",
" stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]" " stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]"
] ]
}, },
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"df = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" <tr>\n",
" <th>code</th>\n",
" <th>m_nDate</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th rowspan=\"5\" valign=\"top\">CSI000140</th>\n",
" <th>20120912</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120913</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120914</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120917</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120918</th>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <th>...</th>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"5\" valign=\"top\">SZ399995</th>\n",
" <th>20220829</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220830</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220831</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220901</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220902</th>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>403101 rows × 0 columns</p>\n",
"</div>"
],
"text/plain": [
"Empty DataFrame\n",
"Columns: []\n",
"Index: [(CSI000140, 20120912), (CSI000140, 20120913), (CSI000140, 20120914), (CSI000140, 20120917), (CSI000140, 20120918), (CSI000140, 20120919), (CSI000140, 20120920), (CSI000140, 20120921), (CSI000140, 20120924), (CSI000140, 20120925), (CSI000140, 20120926), (CSI000140, 20120927), (CSI000140, 20120928), (CSI000140, 20121008), (CSI000140, 20121009), (CSI000140, 20121010), (CSI000140, 20121011), (CSI000140, 20121012), (CSI000140, 20121015), (CSI000140, 20121016), (CSI000140, 20121017), (CSI000140, 20121018), (CSI000140, 20121019), (CSI000140, 20121022), (CSI000140, 20121023), (CSI000140, 20121024), (CSI000140, 20121025), (CSI000140, 20121026), (CSI000140, 20121029), (CSI000140, 20121030), (CSI000140, 20121031), (CSI000140, 20121101), (CSI000140, 20121102), (CSI000140, 20121105), (CSI000140, 20121106), (CSI000140, 20121107), (CSI000140, 20121108), (CSI000140, 20121109), (CSI000140, 20121112), (CSI000140, 20121113), (CSI000140, 20121114), (CSI000140, 20121115), (CSI000140, 20121116), (CSI000140, 20121119), (CSI000140, 20121120), (CSI000140, 20121121), (CSI000140, 20121122), (CSI000140, 20121123), (CSI000140, 20121126), (CSI000140, 20121127), (CSI000140, 20121128), (CSI000140, 20121129), (CSI000140, 20121130), (CSI000140, 20121203), (CSI000140, 20121204), (CSI000140, 20121205), (CSI000140, 20121206), (CSI000140, 20121207), (CSI000140, 20121210), (CSI000140, 20121211), (CSI000140, 20121212), (CSI000140, 20121213), (CSI000140, 20121214), (CSI000140, 20121217), (CSI000140, 20121218), (CSI000140, 20121219), (CSI000140, 20121220), (CSI000140, 20121221), (CSI000140, 20121224), (CSI000140, 20121225), (CSI000140, 20121226), (CSI000140, 20121227), (CSI000140, 20121228), (CSI000140, 20121231), (CSI000140, 20130104), (CSI000140, 20130107), (CSI000140, 20130108), (CSI000140, 20130109), (CSI000140, 20130110), (CSI000140, 20130111), (CSI000140, 20130115), (CSI000140, 20130116), (CSI000140, 20130117), (CSI000140, 20130118), (CSI000140, 20130121), (CSI000140, 20130122), (CSI000140, 20130123), (CSI000140, 20130124), (CSI000140, 20130125), (CSI000140, 20130128), (CSI000140, 20130129), (CSI000140, 20130130), (CSI000140, 20130131), (CSI000140, 20130201), (CSI000140, 20130204), (CSI000140, 20130205), (CSI000140, 20130206), (CSI000140, 20130207), (CSI000140, 20130208), (CSI000140, 20130218), ...]\n",
"\n",
"[403101 rows x 0 columns]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.set_index(['code', 'm_nDate']).sort_index()"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 6, "execution_count": 6,

@ -0,0 +1,173 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "118641f9-96c3-4fd3-aea2-7ecdae17492e",
"metadata": {},
"outputs": [],
"source": [
"import dolphindb as ddb\n",
"\n",
"sess = ddb.session('localhost', 8848)\n",
"sess.login('admin', '123456')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "44e3d35e-2e84-44fc-a3d2-00eaa0135460",
"metadata": {},
"outputs": [],
"source": [
"df = sess.run(\"\"\"\n",
" select code from loadTable(\"dfs://daily_stock_ts\", \"daily_kline\")\n",
" group by code order by code asc\n",
"\"\"\")\n",
"\n",
"df[\"entity_id\"] = df.index\n",
"df.set_index(\"code\", inplace=True)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "20a6066d-8efb-40fd-8a3b-3d848c8c0073",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>entity_id</th>\n",
" </tr>\n",
" <tr>\n",
" <th>code</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>000001.SZ</th>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000002.SZ</th>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000004.SZ</th>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000005.SZ</th>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000006.SZ</th>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>871970.NE</th>\n",
" <td>5010</td>\n",
" </tr>\n",
" <tr>\n",
" <th>871981.NE</th>\n",
" <td>5011</td>\n",
" </tr>\n",
" <tr>\n",
" <th>872925.NE</th>\n",
" <td>5012</td>\n",
" </tr>\n",
" <tr>\n",
" <th>873169.NE</th>\n",
" <td>5013</td>\n",
" </tr>\n",
" <tr>\n",
" <th>873223.NE</th>\n",
" <td>5014</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>5015 rows × 1 columns</p>\n",
"</div>"
],
"text/plain": [
" entity_id\n",
"code \n",
"000001.SZ 0\n",
"000002.SZ 1\n",
"000004.SZ 2\n",
"000005.SZ 3\n",
"000006.SZ 4\n",
"... ...\n",
"871970.NE 5010\n",
"871981.NE 5011\n",
"872925.NE 5012\n",
"873169.NE 5013\n",
"873223.NE 5014\n",
"\n",
"[5015 rows x 1 columns]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc934d77-3cc1-458f-9e28-3f0715e9b87b",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -18,8 +18,7 @@ import dolphindb as ddb
import dolphindb.settings as keys import dolphindb.settings as keys
import sqlalchemy as sa import sqlalchemy as sa
from .DDBLoader import DDBLoader
import ProtoBuffEntitys
class DDBDailyLoader(DDBLoader): class DDBDailyLoader(DDBLoader):
@ -45,6 +44,8 @@ class DDBDailyLoader(DDBLoader):
'DOUBLE' 'DOUBLE'
] ]
memory_table_name = 'daily_kline_mem'
partition_table_name = 'daily_kline'
def create_ddb_database(self): def create_ddb_database(self):
# TODO: daily数据库已经在DDBDailyFactor中被创建了 # TODO: daily数据库已经在DDBDailyFactor中被创建了
@ -62,7 +63,7 @@ class DDBDailyLoader(DDBLoader):
print('Did load database from', self.ddb_path) print('Did load database from', self.ddb_path)
def create_ddb_partition_table(self, memory_table_name, partition_table_name): def create_ddb_partition_table(self):
# TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来 # TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来
# 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可 # 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可
@ -84,37 +85,35 @@ class DDBDailyLoader(DDBLoader):
""".format( """.format(
ddb_daily_path = self.ddb_path, ddb_daily_path = self.ddb_path,
ddb_daily_dbname = self.ddb_dbname, ddb_daily_dbname = self.ddb_dbname,
memory_table_name = memory_table_name, memory_table_name = self.memory_table_name,
partition_table_name = partition_table_name, partition_table_name = self.partition_table_name,
)) ))
def create_ddb_memory_table(self, memory_table_name, capacity): def create_ddb_memory_table(self, capacity):
self.ddb_sess.run(""" self.ddb_sess.run("""
// 先创建一个空的内存表用来表征结构如果无需插入数据capacity可以设为10 // 先创建一个空的内存表用来表征结构如果无需插入数据capacity可以设为10
{memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]); {memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format( """.format(
memory_table_name = memory_table_name, memory_table_name = self.memory_table_name,
capacity = capacity, capacity = capacity,
col_names = '`' + '`'.join(self.daily_kline_cols), col_names = '`' + '`'.join(self.daily_kline_cols),
col_types = ', '.join(self.daily_kline_col_types) col_types = ', '.join(self.daily_kline_col_types)
)) ))
def dump_daily_kline_to_ddb(self): def dump_to_ddb(self):
# 先创建一个分区表,然后再逐个股票的数据插入 # 先创建一个分区表,然后再逐个股票的数据插入
# 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦 # 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦
# 2. python程序中的dataframe直接上传到dolphindb内存表不需要考虑内存表字段类型分区表中设置好即可 # 2. python程序中的dataframe直接上传到dolphindb内存表不需要考虑内存表字段类型分区表中设置好即可
memory_table_name = 'daily_kline_mem'
partition_table_name = 'daily_kline'
self.create_ddb_memory_table(memory_table_name, 10) self.create_ddb_memory_table(10)
print('Did create ddb memory table.') print('Did create ddb memory table.')
pprint(self.ddb_sess.run(f"schema({memory_table_name})")) pprint(self.ddb_sess.run(f"schema({self.memory_table_name})"))
self.create_ddb_partition_table(memory_table_name, partition_table_name) self.create_ddb_partition_table()
print('Did create ddb partition table.') print('Did create ddb partition table.')
pprint(self.ddb_sess.run(f"schema({partition_table_name})")) pprint(self.ddb_sess.run(f"schema({self.partition_table_name})"))
with self.mssql_engine.connect() as conn: with self.mssql_engine.connect() as conn:
stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]" stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]"
@ -137,11 +136,118 @@ class DDBDailyLoader(DDBLoader):
df = pd.DataFrame(row_list) df = pd.DataFrame(row_list)
df['date'] = DDBLoader.make_date(df['date']) df['date'] = DDBLoader.make_date(df['date'])
df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID']) df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID'])
self.ddb_sess.upload({memory_table_name : df}) self.ddb_sess.upload({self.memory_table_name : df})
#print('Did upload dataframe to ddb.') #print('Did upload dataframe to ddb.')
#pprint(self.ddb_sess.run(f"schema({memory_table_name})")) #pprint(self.ddb_sess.run(f"schema({self.memory_table_name})"))
#break #break
self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})") self.ddb_sess.run(f"{self.partition_table_name}.tableInsert({self.memory_table_name})")
class DDBDailyFactorLoader(DDBDailyLoader):
daily_kline_cols = [
'code', 'm_nDate',
# 4种量价配合的因子
'trend_with_turnover', 'trend_with_amount',
"abs_trend_with_turnover", "abs_trend_with_amount",
# Alpha101中量价背离的因子
"alpha101_22"
]
daily_kline_col_types = [
'SYMBOL', 'DATE',
'DOUBLE', 'DOUBLE',
'DOUBLE', 'DOUBLE',
'DOUBLE'
]
memory_table_name = 'daily_factor_mem'
partition_table_name = 'daily_factor'
def dump_to_ddb(self):
self.create_ddb_memory_table(10)
print('Did create ddb memory table.')
pprint(self.ddb_sess.run(f"schema({self.memory_table_name})"))
self.create_ddb_partition_table()
print('Did create ddb partition table.')
pprint(self.ddb_sess.run(f"schema({self.partition_table_name})"))
df_list = [
self.alpha101_22,
self.trend_with_amount,
]
df = pd.concat(df_list, axis=1)
df.reset_index(inplace=True)
print('Did prepare the dataframe for insertion:')
print(df.head())
self.ddb_sess.upload({"tbl": df})
self.ddb_sess.run("tableInsert(loadTable('dfs://daily_stock_ts', 'daily_factor'), tbl)")
pprint("Did dump data to partition table.")
@property
def alpha101_22(self):
sql = """
vol_20 = select code, m_nDate, mstdp(close, 20, 9) as vol_20
from loadTable("dfs://daily_stock_ts", "daily_kline")
context by code;
rank_vol_20 = select code, m_nDate, rank(vol_20, tiesMethod='average', percent=true)
from vol_20 context by m_nDate;
corr_5 = select code, m_nDate, mcorr(high, vol, 5) as corr_5
from loadTable("dfs://daily_stock_ts", "daily_kline")
context by code;
delta_corr_5 = select code, m_nDate, mfirst(corr_5, 5) - corr_5 as delta_corr_5
from corr_5 context by code;
alpha101_22 = select code, m_nDate, delta_corr_5 * rank_vol_20 as alpha101_22
from ej(rank_vol_20, delta_corr_5, `code`m_nDate);
alpha101_22;
"""
df = self.ddb_sess.run(sql)
df.set_index(["code", "m_nDate"], inplace=True)
return df
@property
def trend_with_amount(self):
factor_list = [
'trend_with_turnover', 'trend_with_amount',
"abs_trend_with_turnover", "abs_trend_with_amount",
]
factor_def = {
"trend_with_turnover": "(rank(amount/MarketValues, tiesMethod='average', percent=true) - 0.5) * winsorize(PctChg/20, 0.05)",
"trend_with_amount": "(rank(amount, tiesMethod='average', percent=true) - 0.5) * winsorize(PctChg/20, 0.05)",
"abs_trend_with_turnover": "(rank(amount/MarketValues, tiesMethod='average', percent=true) - 0.5) * abs(winsorize(PctChg/20, 0.05))",
"abs_trend_with_amount": "(rank(amount, tiesMethod='average', percent=true) - 0.5) * abs(winsorize(PctChg/20, 0.05))",
}
cols = ", ".join([
f"{factor_def[factor_name]} as {factor_name}" \
for factor_name in factor_list
])
sql = f"""
select code, m_nDate, {cols}
from loadTable("dfs://daily_stock_ts", "daily_kline")
context by m_nDate
"""
print("factor sql for trend with amount: " + sql)
df = self.ddb_sess.run(sql)
df.set_index(["code", "m_nDate"], inplace=True)
return df
def main(): def main():
@ -150,9 +256,11 @@ def main():
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# 日频行情数据 # 日频行情数据
loader = DDBDailyLoader() #loader = DDBDailyLoader()
loader = DDBDailyFactorLoader()
loader.load_ddb_database() loader.load_ddb_database()
#loader.dump_daily_kline_to_ddb() loader.dump_to_ddb()
if __name__ == '__main__': if __name__ == '__main__':

@ -0,0 +1,12 @@
from .DDBLoader import DDBLoader
class DDBEntityLoader(DDBLoader):
ddb_path = "dfs://daily_stock_ts"
ddb_dbname = "db_daily_stock_ts"
def __init__(self, dtype, **kwargs):
pass

@ -0,0 +1,286 @@
import pickle
import functools
import warnings
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np
import pandas as pd
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
from .DDBLoader import DDBLoader
class DDBIndexLoader(DDBLoader):
ddb_path = "dfs://daily_stock_ts"
ddb_dbname = "db_daily_stock_ts"
def __init__(self, dtype, **kwargs):
# TODO: 后续版本中,父类的构造函数里可能会增加一些设置项
super().__init__(**kwargs)
self.dtype = dtype
if dtype == "concept":
self.mem_tbl_name = "mem_idx_daily_concept"
self.part_tbl_name ="idx_daily_concept"
elif dtype == "kline":
self.mem_tbl_name = "mem_idx_daily_kline"
self.part_tbl_name ="idx_daily_kline"
else:
raise NotImplementedError(f"Unsupported `dtype` argument: {dtype}")
self.make_fields()
self.make_calendar_df()
def make_fields(self):
if self.dtype == "concept":
with self.mssql_engine.connect() as conn:
rs = conn.execute("select IndexID from [IndexInfo].[dbo].[Constituents] group by IndexID")
self.fields = [index_id for (index_id,) in rs.fetchall()]
elif self.dtype == "kline":
self.fields = ['open', 'high', 'low', 'close', 'vol', 'amount', 'yclose']
def make_calendar_df(self):
# 这里我们使用天软日K先数据表来构造交易日历
with self.mssql_engine.connect() as conn:
if self.dtype == "concept":
stat = "select [StockID], [date] from [StockDaily].[dbo].[DailyKLine] group by [StockID], [date]"
elif self.dtype == "kline":
stat = "select [StockID], [date] from [IndexDaily].[dbo].[DailyKLine] group by [StockID], [date]"
else:
raise NotImplementedError(f"Unsupported dtype: {self.dtype}")
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
self.df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
self.df_calendar['m_nDate'] = self.make_date(self.df_calendar['m_nDate'])
self.df_calendar['code'] = self.tscode_to_windcode(self.df_calendar['code'])
print('Did make the DataFrame for calendar')
print(self.df_calendar.head())
def load_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(directory='{dbPath}')
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
print('Did load database from', self.ddb_path)
def create_ddb_partition_table(self):
if self.dtype == "concept":
self._create_ddb_memory_table_concept()
elif self.dtype == "kline":
self._create_ddb_memory_table_kline()
pprint(f"Did create memory table: {self.mem_tbl_name}")
#res = self.ddb_sess.run(f"schema({mem_tbl_name}).colDefs")
if self.ddb_sess.existsTable(self.ddb_path, self.part_tbl_name):
pprint(f"Will drop partition table: {self.part_tbl_name}")
self.ddb_sess.dropTable(self.ddb_path, self.part_tbl_name)
self.ddb_sess.run("""
{part_tbl_name} = {ddb_dbname}.createPartitionedTable(
table = {mem_tbl_name},
tableName = `{part_tbl_name},
partitionColumns = `code,
sortColumns = `code`m_nDate,
compressMethods = {{m_nDate:"delta"}}
)
""".format(
ddb_dbname = self.ddb_dbname,
part_tbl_name = self.part_tbl_name,
mem_tbl_name = self.mem_tbl_name
))
def _create_ddb_memory_table_concept(self):
concept_list = self.fields
col_name_list = ['code', 'm_nDate'] + concept_list
col_type_list = ['SYMBOL', 'DATE'] + ['BOOL'] * len(concept_list)
code = """
{mem_tbl_name} = table(
{capacity}:0,
{col_names},
[{col_types}]
);
""".format(
mem_tbl_name = self.mem_tbl_name,
capacity = 10,
col_names = '`' + '`'.join(col_name_list),
col_types = ','.join(col_type_list)
)
pprint(f"Will create mem table by:\n{code}")
self.ddb_sess.run(code)
def _create_ddb_memory_table_kline(self):
col_name_list = ['code', 'm_nDate'] + self.fields
col_type_list = ['SYMBOL', 'DATE'] + ['DOUBLE'] * len(self.fields)
code = """
{mem_tbl_name} = table(
{capacity} : 0,
{col_names},
[{col_types}]
);
""".format(
mem_tbl_name = self.mem_tbl_name,
capacity = 10,
col_names = '`' + '`'.join(col_name_list),
col_types = ','.join(col_type_list)
)
pprint(f"Will create mem table by:\n{code}")
self.ddb_sess.run(code)
def _make_idx_daily_kline(self):
with tqdm(self.df_calendar.groupby('code')) as pbar:
for wind_code, df_calendar_stock in pbar:
pbar.set_description(f"Will work on {wind_code}")
# 生成ts-code用于查询Sql-Server中天软的概念板块指数
ts_code = wind_code[-2:] + wind_code[:-3]
df_calendar_stock.set_index(['code', 'm_nDate'], inplace=True)
with self.mssql_engine.connect() as conn:
code = """
select
{field_list}
from
[IndexDaily].[dbo].[DailyKLine]
where
StockID='{index_id}'
""".format(
field_list = ','.join([f"[{field}]" for field in (['StockID', 'date'] + self.fields)]),
index_id = ts_code
)
rs = conn.execute(code)
row_list = rs.fetchall()
df = pd.DataFrame(row_list, columns=['code', 'm_nDate'] + self.fields)
df['code'] = self.tscode_to_windcode(df['code'])
df['m_nDate'] = self.make_date(df['m_nDate'])
df.set_index(['code', 'm_nDate'], inplace=True)
yield wind_code, df
def dump_idx_daily_kline_to_ddb(self):
for idx_id, df in self._make_idx_daily_kline():
df.reset_index(inplace=True)
#pprint(f"Will append to partiton table: \n{df}")
self.ddb_sess.upload({self.mem_tbl_name : df})
self.ddb_sess.run("""
append!(loadTable('{dbPath}', `{part_tbl_name}), {mem_tbl_name})
""".format(
dbPath = self.ddb_path,
part_tbl_name = self.part_tbl_name,
mem_tbl_name = self.mem_tbl_name
))
@staticmethod
def _mark_stock2concept_onehot(df_stock2concept, concept_id, start_date, end_date):
# 个股成为某个概念(指数)的起始日期是必定会提供的
# 但是截止日期可能缺失,确实一般意味着当前仍然是在此概念板块中
# 因此会通过将日期填充至最后一日来表示当前仍然在此概念板块内
if end_date is None or end_date == 0:
start_date = pd.to_datetime(str(start_date), format='%Y%m%d')
df_stock2concept.loc[df_stock2concept.index.get_level_values('m_nDate') >= start_date] = True
else:
start_date = pd.to_datetime(str(start_date), format='%Y%m%d')
end_date = pd.to_datetime(str(end_date), format='%Y%m%d')
df_stock2concept.loc[
(df_stock2concept.index.get_level_values('m_nDate') >= start_date) &
(df_stock2concept.index.get_level_values('m_nDate') <= end_date)
] = True
def _make_stock2concept_onehot(self):
# 从calendar中截取出与当前stock有关的日期然后设置成index
# 此处calendar使用的是海通高频数据构建因此股票代码为WIND-CODE
# 对calendar根据股票代码进行分组
with tqdm(self.df_calendar.groupby('code')) as pbar:
for wind_code, df_calendar_stock in pbar:
pbar.set_description(f"Will work on {wind_code}")
# 生成ts-code用于查询Sql-Server中天软的概念板块指数
ts_code = wind_code[-2:] + wind_code[:-3]
df_calendar_stock.set_index(['code', 'm_nDate'], inplace=True)
# 纵表转横表,`concept_list`作为列名
df_stock2concept = pd.DataFrame(
False, # one-hot横表初始化都是0后续根据Sql-Server的进出日期标注1
index=df_calendar_stock.index,
columns=concept_list,
dtype="bool"
)
# 从Sql-Server中读取`stock_id`所对应的概念板块进出日期
# 此数据是从天软指数数据中提取因此需要使用TSCODE
with self.mssql_engine.connect() as conn:
code = """
select
SecId, IndexID, EnterDate, ExitDate
from
[IndexInfo].[dbo].[Constituents]
where
SecID='{stock_id}'
""".format(
stock_id = ts_code
)
rs = conn.execute(code)
row_list = rs.fetchall()
# 从Sql-Server读取出单个股票的所有板块进出日期后开始进行标记
for (stock_id, concept_id, start_date, end_date) in row_list:
# mark the one-hot position one-by-one
self._mark_stock2concept_onehot(
df_stock2concept[concept_id],
concept_id,
start_date, end_date
)
# yield the marked one-hot dataframe for one stock
yield wind_code, df_stock2concept
def dump_idx_concept_to_ddb(self):
concept = self.fields
for stock_id, df in self._make_stock2concept_onehot(concept_list):
df.reset_index(inplace=True)
self.ddb_sess.upload({self.mem_tbl_name : df})
self.ddb_sess.run("""
append!(loadTable('{dbPath}', `{part_tbl_name}), {mem_tbl_name})
""".format(
dbPath = self.ddb_path,
part_tbl_name = self.part_tbl_name,
mem_tbl_name = self.mem_tbl_name
))

@ -0,0 +1,52 @@
from DDBIndexLoader import DDBIndexLoader
class DDBIndexLoaderWind(DDBIndexLoader):
def __init__(self, dtype, **kwargs):
# TODO: 后续版本中,父类的构造函数里可能会增加一些设置项
super().__init__(**kwargs)
self.dtype = dtype
if dtype == "concept":
self.mem_tbl_name = "mem_idx_daily_concept_wind"
self.part_tbl_name ="idx_daily_concept_wind"
elif dtype == "kline":
self.mem_tbl_name = "mem_idx_daily_kline_wind"
self.part_tbl_name ="idx_daily_kline_wind"
else:
raise NotImplementedError(f"Unsupported `dtype` argument: {dtype}")
self.make_fields()
self.make_calendar_df()
def make_fields(self):
if self.dtype == "concept":
with self.mssql_engine.connect() as conn:
rs = conn.execute("select [WIND_SEC_CODE] from [IndexInfo].[dbo].[Constituents] group by IndexID")
self.fields = [index_id for (index_id,) in rs.fetchall()]
elif self.dtype == "kline":
self.fields = ['open', 'high', 'low', 'close', 'vol', 'amount', 'yclose']
def make_calendar_df(self):
# 这里我们使用天软日K先数据表来构造交易日历
with self.mssql_engine.connect() as conn:
if self.dtype == "concept":
stat = "select [StockID], [date] from [StockDaily].[dbo].[DailyKLine] group by [StockID], [date]"
elif self.dtype == "kline":
stat = "select [StockID], [date] from [IndexDaily].[dbo].[DailyKLine] group by [StockID], [date]"
else:
raise NotImplementedError(f"Unsupported dtype: {self.dtype}")
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
self.df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
self.df_calendar['m_nDate'] = self.make_date(self.df_calendar['m_nDate'])
self.df_calendar['code'] = self.tscode_to_windcode(self.df_calendar['code'])
print('Did make the DataFrame for calendar')
print(self.df_calendar.head())

@ -3,14 +3,20 @@ from loader.DDBPITLoader import DDBPITLoader
from loader.DDBHFTLoader import DDBHFTLoader from loader.DDBHFTLoader import DDBHFTLoader
from loader.DDBBasicInfoLoader import DDBBasicInfoLoader from loader.DDBBasicInfoLoader import DDBBasicInfoLoader
from loader.DDBIndexLoader import DDBIndexLoader from loader.DDBIndexLoader import DDBIndexLoader
from loader.DDBDailyLoader import DDBDailyLoader, DDBDailyFactorLoader
def create_index_data(): def create_index_data():
# 板块指数数据 # 板块指数数据
loader = DDBIndexLoader(host='192.168.1.7') loader = DDBIndexLoader(dtype="kline", host='localhost')
loader.load_ddb_database() loader.load_ddb_database()
#mem_tbl_name, part_tbl_name, fields = loader.create_ddb_partition_table("concept")
#loader.dump_idx_concept_to_ddb(mem_tbl_name, part_tbl_name, concept_list)
# 指数日K线数据
loader.create_ddb_partition_table() loader.create_ddb_partition_table()
loader.dump_idx_concept_to_ddb() loader.dump_idx_daily_kline_to_ddb()
def create_hft_data(): def create_hft_data():
@ -46,7 +52,13 @@ def create_daily_kline_data():
# 日频行情数据 # 日频行情数据
loader = DDBDailyLoader() loader = DDBDailyLoader()
loader.load_ddb_database() loader.load_ddb_database()
loader.dump_daily_kline_to_ddb() loader.dump_to_ddb()
def create_daily_factor_data():
loader = DDBDailyFactorLoader(host="localhost")
loader.load_ddb_database()
loader.dump_to_ddb()
@ -54,7 +66,8 @@ def main():
# TODO: # TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
create_index_data() create_daily_factor_data()
#create_index_data()
#create_hft_data() #create_hft_data()
#create_pit_data() #create_pit_data()

Loading…
Cancel
Save