Compare commits

...

5 Commits
1.0.0 ... main

1
.gitignore vendored

@ -5,6 +5,7 @@
.DS_Store
*.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

@ -75,7 +75,50 @@
- 跨年同比指标计算(分母为去年基数取绝对值,函数会同时返回当年值和去年基数,以便进一步调整)
## 海通高频数据录入说明
## 一分钟K线数据校验
### 海通数据vs 旭诺MongoDB数据质量对比
MongoDB的数据问题比较严重主要有以下几个大问题
1. MongoDB的日内分钟数据加上了9:25集合竞价的数据同时剔除了14:57-14:59尾盘集合竞价的数据所以MongoDB的标准日内分钟是239分钟但实际上日内分钟不同股票、不同交易日内都有不同程度的分钟丢失导致股票日内记录数通常小于239集中在开盘前几分钟这个问题比较严重开盘往往对因子计算影响较大尾盘收盘时刻和日内离散的时刻采样结果显示20220121当天4609个股票仅99只股票有15:00收盘时刻的数据
2. 早年的数据甚至有整个交易日所有股票无数据的情况例如2018、2019年近年来少一些
3. 有些股票的时间序列上有部分字段的数据缺失,例如只有开盘和收盘价,没有最高最低,或者有价格数据但无成交数据。
### 海通分钟数据和旭诺MongoDB映射表
| 海通字段 | MongoDB字段 | 映射规则(旭诺→海通) |
| -------------- | ---------- | ------------ |
| m\_nDate | nActionDay | Int型→date型 |
| m\_nTime | nTime | Int型→time型 |
| m\_nOpen | nOpen | nOpen/10000 |
| m\_nHigh | nHigh | nHigh/10000 |
| m\_nLow | nLow | nLow/10000 |
| m\_nClose | nClose | nClose/10000 |
| code | szXnCode | copy |
| m\_iVolume | iVolume | copy |
| m\_iTurover | iTurnover | copy |
| m\_nMatchItems | | |
另外还要注意以下几点:
1. 海通数据是前置的旭诺数据是后置的即前者14:56的数据反映的是14:56-14:57的成交情况后者反映的是14:55-14:56的成交情况所以在映射时要错位映射。
2. 1的逻辑也导致了海通数据没有11:30和15:00两个时间戳而旭诺数据有且旭诺数据将14:56-15:00的数据都包括在了15:00这个时间戳内。不过旭诺数据依然有9:30的时间戳反映开盘行情。
### 海通数据vs天软数据
以天软数据为基准,海通数据主要有三点问题:
1. 历史上一大段时间缺失了001872 001914 689009三只股票其中
- 001872在20181226之前无数据因为20181226更换了股票代码之前的股票代码为000022
- 001914在20191216之前无数据因为20191216更换了股票代码之前的股票代码为000043
- 689009在20210402-20210528期间无数据股票这期间并未更换股票代码或停牌等其他重大事件
- 对于前两种情况海通的处理逻辑是变更前和变更后的代码分开记录变更前后的数据而天软的处理逻辑是保留变更前的记录同时copy一份到变更后并用变更后的股票代码继续记录变更后的数据所以导致变更之前的时间戳上天软有变更后代码的数据但海通没有。对于第三种情况则属于__数据缺失__。
2. 海通数据多了很多停牌、未上市或者已退市但并未做剔除处理的数据最多的时候单日多出来190只股票。这些未剔除的数据都是当天停牌的显示为高开低收全天一样且成交量、成交额、成交笔数全为0唯一例外是20151123的600568.SH在9:30有成交数据但当天实际上是停牌的。由于海通对于停牌等情况的基本处理逻辑是剔除当天行情整个行情理论上不包含停牌期的时间戳和数据且在天软里面也完全没有退市、停牌、未上市期间的行情数据所以这些多出来的股票数据属于__异常__。
3. 海通的数据相比于天软剔除了北交所的股票但多出来了510300.SH, 510050.SH, 510500.SH, 159915.SZ, 159949.SZ 5个etf。
- 不过目前这三个问题影响不算很大对于1可以在后续处理时将两个股票代码的行情进行拼接并统一用变更后代码赋值code列即可保证和天软数据一致对于2由于天软daily是准确的可以通过天软daily数据进行过滤或者对日内vol求和进行判断和为0的属于异常点计算因子时不予考虑对于3在数据处理时加上股票代码的正则表达式过滤。
## Appendix: 海通高频数据录入说明:
- Python语言从数据库中读取字节流程序并解码还原示例程序在ReadTickFromDB.py中。
- 该函数需要pymssql、gzip、google.protobuf可用pip install protobuf安装三个外部依赖库需要在运行程序。

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 214,
"execution_count": 3,
"id": "139fd1cb-aedf-4186-8408-4d630ba69599",
"metadata": {},
"outputs": [],
@ -13,323 +13,6 @@
"sess.login('admin', '123456')"
]
},
{
"cell_type": "code",
"execution_count": 216,
"id": "98ef95cf-0a7f-4e9d-8c7c-bc1d2a6268bd",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>tableName</th>\n",
" <th>physicalIndex</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>divident</td>\n",
" <td>Gws</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>preliminary_earnings_estimate</td>\n",
" <td>FyP</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>is_common_ori</td>\n",
" <td>DvA</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>cfs_common_ori</td>\n",
" <td>AUH</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>bs_common_adj</td>\n",
" <td>x2R</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>earnings_preannouncement</td>\n",
" <td>EOu</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>cfs_common_adj</td>\n",
" <td>zCC</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>is_common_adj</td>\n",
" <td>Cdl</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>bs_common_ori</td>\n",
" <td>ykc</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" tableName physicalIndex\n",
"0 divident Gws\n",
"1 preliminary_earnings_estimate FyP\n",
"2 is_common_ori DvA\n",
"3 cfs_common_ori AUH\n",
"4 bs_common_adj x2R\n",
"5 earnings_preannouncement EOu\n",
"6 cfs_common_adj zCC\n",
"7 is_common_adj Cdl\n",
"8 bs_common_ori ykc"
]
},
"execution_count": 216,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"listTables('dfs://pit_stock_ts')\")"
]
},
{
"cell_type": "code",
"execution_count": 217,
"id": "12d5328e-ee57-4cf6-adeb-863649cbb19d",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'partitionType': 5,\n",
" 'partitionColumnType': 17,\n",
" 'partitionColumnIndex': 0,\n",
" 'chunkPath': None,\n",
" 'colDefs': name typeString typeInt comment\n",
" 0 code SYMBOL 17 \n",
" 1 report_period DATE 6 \n",
" 2 appear_at_date DATE 6 \n",
" 3 S_PROFITNOTICE_STYLE STRING 18 \n",
" 4 S_PROFITNOTICE_CHANGEMIN DOUBLE 16 \n",
" 5 S_PROFITNOTICE_CHANGEMAX DOUBLE 16 \n",
" 6 S_PROFITNOTICE_NETPROFITMIN DOUBLE 16 \n",
" 7 S_PROFITNOTICE_NETPROFITMAX DOUBLE 16 \n",
" 8 S_PROFITNOTICE_REASON STRING 18 ,\n",
" 'chunkGranularity': 'TABLE',\n",
" 'partitionTypeName': 'HASH',\n",
" 'keepDuplicates': 'ALL',\n",
" 'engineType': 'TSDB',\n",
" 'partitionColumnName': 'code',\n",
" 'partitionSchema': 50,\n",
" 'sortColumns': array(['code', 'report_period', 'appear_at_date'], dtype=object),\n",
" 'partitionSites': None}"
]
},
"execution_count": 217,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
" schema(loadTable(\"dfs://pit_stock_ts\", \"earnings_preannouncement\"))\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 218,
"id": "46547093-9ee6-46ed-a821-73f3237b3c5e",
"metadata": {},
"outputs": [],
"source": [
"import sqlalchemy as sa\n",
"engine = sa.create_engine(\n",
" 'mssql+pyodbc://sa:xn.123@192.168.1.91/tr_statement?driver=ODBC+Driver+18+for+SQL+Server',\n",
" connect_args = {\n",
" \"TrustServerCertificate\": \"yes\"\n",
" }, echo=False)"
]
},
{
"cell_type": "code",
"execution_count": 224,
"id": "26b5701e-17e5-439e-8921-e7523384eee7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('tr_statement', 'dbo', 'SRC_TS_DIVIDEND_ANNOUNCEMENT', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_PRELIMINARY_EARNING_EST', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_FINCOMP_CASHFLOWSTATEMENT', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_FINCOMP_INCOMESTATEMENT', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_FINCOMP_BALANCESHEET', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_CONSOLIDATED_CASHFLOWSTATEMENT', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_CONSOLIDATED_INCOMESTATEMENT', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_CONSOLIDATED_BALANCESHEET', 'BASE TABLE'),\n",
" ('tr_statement', 'dbo', 'CBS_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CBS_BEFORE_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CBS_AFTER_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CIS_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CIS_BEFORE_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CIS_AFTER_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CCFS_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CCFS_BEFORE_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'CCFS_AFTER_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FBS_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FBS_BEFORE_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FBS_AFTER_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FIS_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FIS_BEFORE_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FIS_AFTER_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FCFS_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FCFS_BEFORE_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'FCFS_AFTER_ADJ', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'PEE_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'PEE_WIND', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'EP_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'DIV_META', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'DIV_WIND', 'VIEW'),\n",
" ('tr_statement', 'dbo', 'SRC_TS_NON_RECURRING_PNL', 'BASE TABLE')]\n"
]
}
],
"source": [
"from pprint import pprint \n",
"\n",
"with engine.connect() as conn:\n",
" rs = conn.execute(\"\"\"\n",
"SELECT\n",
" *\n",
"FROM\n",
" tr_statement.INFORMATION_SCHEMA.TABLES;\n",
"\"\"\").fetchall()\n",
"\n",
"pprint(rs)"
]
},
{
"cell_type": "code",
"execution_count": 231,
"id": "2dcd793a-2442-4b4e-b261-b2cead090efc",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'StockID', 1, 'char', 8, 8, None, None, 0, None, None, 1, None, 8, 1, 'NO', 47),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'StockName', 12, 'varchar', 20, 20, None, None, 1, None, None, 12, None, 20, 2, 'YES', 39),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'ReportPeriod', 4, 'int', 10, 4, 0, 10, 0, None, None, 4, None, None, 3, 'NO', 56),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'AppearAtDate', 4, 'int', 10, 4, 0, 10, 0, None, None, 4, None, None, 4, 'NO', 56),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_CONTENT', -1, 'text', 2147483647, 2147483647, None, None, 1, None, None, -1, None, 2147483647, 5, 'YES', 35),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_STYLE', 12, 'varchar', 20, 20, None, None, 1, None, None, 12, None, 20, 6, 'YES', 39),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_CHANG_SOURCE', 12, 'varchar', 20, 20, None, None, 1, None, None, 12, None, 20, 7, 'YES', 39),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_REASON', -1, 'text', 2147483647, 2147483647, None, None, 1, None, None, -1, None, 2147483647, 8, 'YES', 35),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'REMARK', -1, 'text', 2147483647, 2147483647, None, None, 1, None, None, -1, None, 2147483647, 9, 'YES', 35),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_NETPROFITMAX', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 10, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_NETPROFITUNIT', 12, 'varchar', 20, 20, None, None, 1, None, None, 12, None, 20, 11, 'YES', 39),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_NETPROFITMIN', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 12, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_CHANGEMIN', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 13, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'SRC_TS_EARNINGS_PREANNOUNCEMENT', 'S_PROFITNOTICE_CHANGEMAX', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 14, 'YES', 109)]\n"
]
}
],
"source": [
"with engine.connect() as conn:\n",
" rs = conn.execute(\"EXEC sp_columns SRC_TS_EARNINGS_PREANNOUNCEMENT\").fetchall()\n",
"\n",
"pprint(rs)"
]
},
{
"cell_type": "code",
"execution_count": 230,
"id": "e5904cbf-4405-4d2e-bc5b-88706273f5d0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('NE430047', '诺思兰德', 20211231, 20220225, '净利润亏损5174.6万元', '增亏', '天软计算', '本报告期内,公司经营业绩较去年亏损增加。主要原因为:1、报告期内公司研发费用、管理费用较上年度有所增加2、报告期内毛利率较高的技术转让收入减少导致本报告期亏损增加。', 'nan', -5174.6, '万元', -5174.6, -104.1182, -104.1182),\n",
" ('NE430489', '佳先股份', 20211231, 20220228, '净利润盈利6193.94万元比上年同期增长125.26%', '预增', '公告中公布', '本报告期归属于上市公司股东的净利润预计增长125.26%的主要原因如下:\\r\\n1、报告期内公司新厂区生产经营稳定产能逐步放大为公司满足旺盛\\r\\n的市场需求提供了重要保障公司通过不断地拓展销售渠道销售业务实现较好\\r\\n增长同时因疫情、海运费上涨以及原料价格上涨、供求关系紧张等影响 ... (254 characters truncated) ... 和盈利水平得到进一步提升。\\r\\n3、报告期内公司将控股子公司安徽沙丰新材料有限公司全年数据纳入合\\r\\n并报表而上年同期由于收购日为2020年12月8日合并报表中仅合并了安\\r\\n徽沙丰新材料有限公司12月份数据。上述合并范围的差异也导致了公司报告期\\r\\n营业收入、利润较去年同期增长。', 'nan', 6193.94, '万元', 6193.94, 125.26, 125.26),\n",
" ('NE430510', '丰光精密', 20211231, 20220121, '净利润盈利4800万元-5100万元比上年同期增长43.62%-52.6%', '预增', '公告中公布', '(1)报告期内公司下游行业需求旺盛,同时公司市场拓展取得较好效果,公司半导体装备制造类产品和工业自动化类产品营业收入较上年同期增加;(2)公司汽车类产品营业收入较上年同期增加;(3)公司前期开发的新客户、新项目逐步实现量产,相关产品营业收入相应增加。\\r\\n', 'nan', 5100.0, '万元', 4800.0, 43.62, 52.6),\n",
" ('NE430564', '天润科技', 20220331, 20220525, '净利润盈利250万元-320万元比上年同期增长74.14%-122.9%', '预增', '公告中公布', '2022年1月份受西安疫情的影响公司在陕西省内的部分项目完工及验收时间有所推迟公司2022年第一季度收入较上年同期有所下降但由于2021年第一季度确认收入占比84.87%的汕头市中心城区(金平、龙湖区)农村地籍调查服务采购项目毛利率较低导致2022年第一季度公司预计收入规模虽较去年同期有所下降但净利润却较同期有所增长。', 'nan', 320.0, '万元', 250.0, 74.14, 122.9),\n",
" ('NE831370', '新安洁 ', 20211231, 20220225, '净利润盈利4159万元比上年同期下降55.38%', '预减', '公告中公布', '1、公司本报告期增值税及社保支出较上年同期有增长\\r\\n2、公司因应收账款账龄和金额发生变化本报告期的信用减值损失增加\\r\\n3、本报告期公司因部分项目撤场等原因导致资产处置损失增加。\\r\\n', 'nan', 4159.0, '万元', 4159.0, -55.38, -55.38),\n",
" ('NE831689', '克莱特 ', 20211231, 20220321, '净利润盈利4577.19万元比上年同期增长80.24%', '预增', '公告中公布', '公司归属于母公司所有者净利润上升,主要系受行业政策等因素影响,风电\\r\\n新能源装备等下游行业景气度不断提升对公司产品需求增长较快使得公司订\\r\\n单不断增长公司盈利能力进一步提升。\\r\\n受益于国家产业政策的支持国内海上风力发电行业进入快速发展阶段。公\\r\\n司顺势而为积极拓展海上风力 ... (34 characters truncated) ... 合作关系,如空空冷却器、水冷系统等海上风力发电产品。\\r\\n受到上述因素的影响公司新能源装备领域的收入出现了大幅的增长带动公司\\r\\n收入从2020年的28,491.97万元上升至2021年的39,156.58万元,同时带动公司\\r\\n的净利润从2,539.48万元上升至4,577.19万元。', 'nan', 4577.19, '万元', 4577.19, 80.24, 80.24),\n",
" ('NE831689', '克莱特 ', 20220331, 20220302, '净利润盈利900万元-950万元比上年同期增长6.57%-12.49%', '预增', '公告中公布', 'nan', 'nan', 950.0, '万元', 900.0, 6.57, 12.49),\n",
" ('NE831832', '科达自控', 20211231, 20220124, '净利润盈利3600万元-4000万元比上年同期增长23.88%-37.64%', '预增', '公告中公布', '报告期内公司省内外市场拓展取得较好效果,营业收入与上年同期相比有所提升净利润相应增加。', 'nan', 4000.0, '万元', 3600.0, 23.88, 37.64),\n",
" ('NE832145', '恒合股份', 20211231, 20220228, '净利润盈利1670.1万元比上年同期下降45.6%', '预减', '公告中公布', '1、公司油气回收在线监测收入的下降是导致营业收入和净利润降低的主要因素。该业务受疫情等因素的影响下游客户暂停或延缓招标加之疫情的反复\\r\\n使得已经开始的项目有所放缓致使公司油气回收在线监测系统总体收入有所降\\r\\n低进而影响了公司的营收和净利润。\\r\\n2、同时液位量测系统已过政策大力推动期公司液位量测业务量有所下\\r\\n降导致2021年营业收入也随之降低。\\r\\n3、公司预计归属于上市公司股东的扣除非经常性损益后的净利润变动幅度\\r\\n较大主要系公司在2021年度收到计入非经常性损益的上市补助及重大贡献奖\\r\\n励所致。', 'nan', 1670.1, '万元', 1670.1, -45.6, -45.6),\n",
" ('NE832419', '路斯股份', 20211231, 20220223, '净利润盈利2900万元-3500万元比上年同期下降9.72%-25.19%', '预减', '公告中公布', 'nan', 'nan', 3500.0, '万元', 2900.0, -25.19, -9.72)]\n"
]
}
],
"source": [
"with engine.connect() as conn:\n",
" rs = conn.execute(\"select top 10 * from SRC_TS_EARNINGS_PREANNOUNCEMENT\").fetchall()\n",
"\n",
"pprint(rs)"
]
},
{
"cell_type": "code",
"execution_count": 229,
"id": "f6e2ef1d-2e68-49c1-8c57-a0b937037831",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('tr_statement', 'dbo', 'EP_WIND', 'WIND_CODE', 12, 'varchar', 13, 13, None, None, 1, None, None, 12, None, 13, 1, 'YES', 39),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'IntCode', 4, 'int', 10, 4, 0, 10, 1, None, None, 4, None, None, 2, 'YES', 38),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'ReportPeriod', 4, 'int', 10, 4, 0, 10, 0, None, None, 4, None, None, 3, 'NO', 56),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'AppearAtDate', 4, 'int', 10, 4, 0, 10, 0, None, None, 4, None, None, 4, 'NO', 56),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'S_PROFITNOTICE_STYLE', 12, 'varchar', 20, 20, None, None, 1, None, None, 12, None, 20, 5, 'YES', 39),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'S_PROFITNOTICE_CHANGEMIN', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 6, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'S_PROFITNOTICE_CHANGEMAX', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 7, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'S_PROFITNOTICE_NETPROFITMIN', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 8, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'S_PROFITNOTICE_NETPROFITMAX', 6, 'float', 15, 8, None, 10, 1, None, None, 6, None, None, 9, 'YES', 109),\n",
" ('tr_statement', 'dbo', 'EP_WIND', 'S_PROFITNOTICE_REASON', -1, 'text', 2147483647, 2147483647, None, None, 1, None, None, -1, None, 2147483647, 10, 'YES', 35)]\n"
]
}
],
"source": [
"with engine.connect() as conn:\n",
" rs = conn.execute(\"EXEC sp_columns EP_WIND\").fetchall()\n",
"\n",
"pprint(rs)"
]
},
{
"cell_type": "code",
"execution_count": 42,

@ -28,7 +28,121 @@
},
{
"cell_type": "code",
"execution_count": 40,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"with engine.connect() as conn:\n",
" stat = \"select [StockID], [date] from [IndexDaily].[dbo].[DailyKLine] group by [StockID], [date]\"\n",
" rs = conn.execute(stat)\n",
" stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"df = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" <tr>\n",
" <th>code</th>\n",
" <th>m_nDate</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th rowspan=\"5\" valign=\"top\">CSI000140</th>\n",
" <th>20120912</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120913</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120914</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120917</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20120918</th>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <th>...</th>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"5\" valign=\"top\">SZ399995</th>\n",
" <th>20220829</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220830</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220831</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220901</th>\n",
" </tr>\n",
" <tr>\n",
" <th>20220902</th>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>403101 rows × 0 columns</p>\n",
"</div>"
],
"text/plain": [
"Empty DataFrame\n",
"Columns: []\n",
"Index: [(CSI000140, 20120912), (CSI000140, 20120913), (CSI000140, 20120914), (CSI000140, 20120917), (CSI000140, 20120918), (CSI000140, 20120919), (CSI000140, 20120920), (CSI000140, 20120921), (CSI000140, 20120924), (CSI000140, 20120925), (CSI000140, 20120926), (CSI000140, 20120927), (CSI000140, 20120928), (CSI000140, 20121008), (CSI000140, 20121009), (CSI000140, 20121010), (CSI000140, 20121011), (CSI000140, 20121012), (CSI000140, 20121015), (CSI000140, 20121016), (CSI000140, 20121017), (CSI000140, 20121018), (CSI000140, 20121019), (CSI000140, 20121022), (CSI000140, 20121023), (CSI000140, 20121024), (CSI000140, 20121025), (CSI000140, 20121026), (CSI000140, 20121029), (CSI000140, 20121030), (CSI000140, 20121031), (CSI000140, 20121101), (CSI000140, 20121102), (CSI000140, 20121105), (CSI000140, 20121106), (CSI000140, 20121107), (CSI000140, 20121108), (CSI000140, 20121109), (CSI000140, 20121112), (CSI000140, 20121113), (CSI000140, 20121114), (CSI000140, 20121115), (CSI000140, 20121116), (CSI000140, 20121119), (CSI000140, 20121120), (CSI000140, 20121121), (CSI000140, 20121122), (CSI000140, 20121123), (CSI000140, 20121126), (CSI000140, 20121127), (CSI000140, 20121128), (CSI000140, 20121129), (CSI000140, 20121130), (CSI000140, 20121203), (CSI000140, 20121204), (CSI000140, 20121205), (CSI000140, 20121206), (CSI000140, 20121207), (CSI000140, 20121210), (CSI000140, 20121211), (CSI000140, 20121212), (CSI000140, 20121213), (CSI000140, 20121214), (CSI000140, 20121217), (CSI000140, 20121218), (CSI000140, 20121219), (CSI000140, 20121220), (CSI000140, 20121221), (CSI000140, 20121224), (CSI000140, 20121225), (CSI000140, 20121226), (CSI000140, 20121227), (CSI000140, 20121228), (CSI000140, 20121231), (CSI000140, 20130104), (CSI000140, 20130107), (CSI000140, 20130108), (CSI000140, 20130109), (CSI000140, 20130110), (CSI000140, 20130111), (CSI000140, 20130115), (CSI000140, 20130116), (CSI000140, 20130117), (CSI000140, 20130118), (CSI000140, 20130121), (CSI000140, 20130122), (CSI000140, 20130123), (CSI000140, 20130124), (CSI000140, 20130125), (CSI000140, 20130128), (CSI000140, 20130129), (CSI000140, 20130130), (CSI000140, 20130131), (CSI000140, 20130201), (CSI000140, 20130204), (CSI000140, 20130205), (CSI000140, 20130206), (CSI000140, 20130207), (CSI000140, 20130208), (CSI000140, 20130218), ...]\n",
"\n",
"[403101 rows x 0 columns]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.set_index(['code', 'm_nDate']).sort_index()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
@ -37,16 +151,16 @@
},
{
"cell_type": "code",
"execution_count": 45,
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4843"
"5015"
]
},
"execution_count": 45,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}

@ -0,0 +1,173 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "118641f9-96c3-4fd3-aea2-7ecdae17492e",
"metadata": {},
"outputs": [],
"source": [
"import dolphindb as ddb\n",
"\n",
"sess = ddb.session('localhost', 8848)\n",
"sess.login('admin', '123456')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "44e3d35e-2e84-44fc-a3d2-00eaa0135460",
"metadata": {},
"outputs": [],
"source": [
"df = sess.run(\"\"\"\n",
" select code from loadTable(\"dfs://daily_stock_ts\", \"daily_kline\")\n",
" group by code order by code asc\n",
"\"\"\")\n",
"\n",
"df[\"entity_id\"] = df.index\n",
"df.set_index(\"code\", inplace=True)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "20a6066d-8efb-40fd-8a3b-3d848c8c0073",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>entity_id</th>\n",
" </tr>\n",
" <tr>\n",
" <th>code</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>000001.SZ</th>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000002.SZ</th>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000004.SZ</th>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000005.SZ</th>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>000006.SZ</th>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>871970.NE</th>\n",
" <td>5010</td>\n",
" </tr>\n",
" <tr>\n",
" <th>871981.NE</th>\n",
" <td>5011</td>\n",
" </tr>\n",
" <tr>\n",
" <th>872925.NE</th>\n",
" <td>5012</td>\n",
" </tr>\n",
" <tr>\n",
" <th>873169.NE</th>\n",
" <td>5013</td>\n",
" </tr>\n",
" <tr>\n",
" <th>873223.NE</th>\n",
" <td>5014</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>5015 rows × 1 columns</p>\n",
"</div>"
],
"text/plain": [
" entity_id\n",
"code \n",
"000001.SZ 0\n",
"000002.SZ 1\n",
"000004.SZ 2\n",
"000005.SZ 3\n",
"000006.SZ 4\n",
"... ...\n",
"871970.NE 5010\n",
"871981.NE 5011\n",
"872925.NE 5012\n",
"873169.NE 5013\n",
"873223.NE 5014\n",
"\n",
"[5015 rows x 1 columns]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc934d77-3cc1-458f-9e28-3f0715e9b87b",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -1,6 +1,6 @@
import dolphindb as ddb
from pprint import pprint
class DDBBase(object):
@ -11,7 +11,10 @@ class DDBBase(object):
}
def __init__(self):
def __init__(self, host=None, **kwargs):
if host is not None:
self.ddb_config['host'] = host
self.ddb_sess = ddb.session(self.ddb_config['host'], 8848)
self.ddb_sess.login(self.ddb_config['username'], self.ddb_config['password'])

@ -18,8 +18,7 @@ import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
import ProtoBuffEntitys
from .DDBLoader import DDBLoader
class DDBDailyLoader(DDBLoader):
@ -45,6 +44,8 @@ class DDBDailyLoader(DDBLoader):
'DOUBLE'
]
memory_table_name = 'daily_kline_mem'
partition_table_name = 'daily_kline'
def create_ddb_database(self):
# TODO: daily数据库已经在DDBDailyFactor中被创建了
@ -62,7 +63,7 @@ class DDBDailyLoader(DDBLoader):
print('Did load database from', self.ddb_path)
def create_ddb_partition_table(self, memory_table_name, partition_table_name):
def create_ddb_partition_table(self):
# TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来
# 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可
@ -84,37 +85,35 @@ class DDBDailyLoader(DDBLoader):
""".format(
ddb_daily_path = self.ddb_path,
ddb_daily_dbname = self.ddb_dbname,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name,
memory_table_name = self.memory_table_name,
partition_table_name = self.partition_table_name,
))
def create_ddb_memory_table(self, memory_table_name, capacity):
def create_ddb_memory_table(self, capacity):
self.ddb_sess.run("""
// 先创建一个空的内存表用来表征结构如果无需插入数据capacity可以设为10
{memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
memory_table_name = memory_table_name,
memory_table_name = self.memory_table_name,
capacity = capacity,
col_names = '`' + '`'.join(self.daily_kline_cols),
col_types = ', '.join(self.daily_kline_col_types)
))
def dump_daily_kline_to_ddb(self):
def dump_to_ddb(self):
# 先创建一个分区表,然后再逐个股票的数据插入
# 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦
# 2. python程序中的dataframe直接上传到dolphindb内存表不需要考虑内存表字段类型分区表中设置好即可
memory_table_name = 'daily_kline_mem'
partition_table_name = 'daily_kline'
self.create_ddb_memory_table(memory_table_name, 10)
self.create_ddb_memory_table(10)
print('Did create ddb memory table.')
pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
self.create_ddb_partition_table(memory_table_name, partition_table_name)
pprint(self.ddb_sess.run(f"schema({self.memory_table_name})"))
self.create_ddb_partition_table()
print('Did create ddb partition table.')
pprint(self.ddb_sess.run(f"schema({partition_table_name})"))
pprint(self.ddb_sess.run(f"schema({self.partition_table_name})"))
with self.mssql_engine.connect() as conn:
stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]"
@ -137,22 +136,131 @@ class DDBDailyLoader(DDBLoader):
df = pd.DataFrame(row_list)
df['date'] = DDBLoader.make_date(df['date'])
df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID'])
self.ddb_sess.upload({memory_table_name : df})
self.ddb_sess.upload({self.memory_table_name : df})
#print('Did upload dataframe to ddb.')
#pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
#pprint(self.ddb_sess.run(f"schema({self.memory_table_name})"))
#break
self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})")
self.ddb_sess.run(f"{self.partition_table_name}.tableInsert({self.memory_table_name})")
class DDBDailyFactorLoader(DDBDailyLoader):
daily_kline_cols = [
'code', 'm_nDate',
# 4种量价配合的因子
'trend_with_turnover', 'trend_with_amount',
"abs_trend_with_turnover", "abs_trend_with_amount",
# Alpha101中量价背离的因子
"alpha101_22"
]
daily_kline_col_types = [
'SYMBOL', 'DATE',
'DOUBLE', 'DOUBLE',
'DOUBLE', 'DOUBLE',
'DOUBLE'
]
memory_table_name = 'daily_factor_mem'
partition_table_name = 'daily_factor'
def dump_to_ddb(self):
self.create_ddb_memory_table(10)
print('Did create ddb memory table.')
pprint(self.ddb_sess.run(f"schema({self.memory_table_name})"))
self.create_ddb_partition_table()
print('Did create ddb partition table.')
pprint(self.ddb_sess.run(f"schema({self.partition_table_name})"))
df_list = [
self.alpha101_22,
self.trend_with_amount,
]
df = pd.concat(df_list, axis=1)
df.reset_index(inplace=True)
print('Did prepare the dataframe for insertion:')
print(df.head())
self.ddb_sess.upload({"tbl": df})
self.ddb_sess.run("tableInsert(loadTable('dfs://daily_stock_ts', 'daily_factor'), tbl)")
pprint("Did dump data to partition table.")
@property
def alpha101_22(self):
sql = """
vol_20 = select code, m_nDate, mstdp(close, 20, 9) as vol_20
from loadTable("dfs://daily_stock_ts", "daily_kline")
context by code;
rank_vol_20 = select code, m_nDate, rank(vol_20, tiesMethod='average', percent=true)
from vol_20 context by m_nDate;
corr_5 = select code, m_nDate, mcorr(high, vol, 5) as corr_5
from loadTable("dfs://daily_stock_ts", "daily_kline")
context by code;
delta_corr_5 = select code, m_nDate, mfirst(corr_5, 5) - corr_5 as delta_corr_5
from corr_5 context by code;
alpha101_22 = select code, m_nDate, delta_corr_5 * rank_vol_20 as alpha101_22
from ej(rank_vol_20, delta_corr_5, `code`m_nDate);
alpha101_22;
"""
df = self.ddb_sess.run(sql)
df.set_index(["code", "m_nDate"], inplace=True)
return df
@property
def trend_with_amount(self):
factor_list = [
'trend_with_turnover', 'trend_with_amount',
"abs_trend_with_turnover", "abs_trend_with_amount",
]
factor_def = {
"trend_with_turnover": "(rank(amount/MarketValues, tiesMethod='average', percent=true) - 0.5) * winsorize(PctChg/20, 0.05)",
"trend_with_amount": "(rank(amount, tiesMethod='average', percent=true) - 0.5) * winsorize(PctChg/20, 0.05)",
"abs_trend_with_turnover": "(rank(amount/MarketValues, tiesMethod='average', percent=true) - 0.5) * abs(winsorize(PctChg/20, 0.05))",
"abs_trend_with_amount": "(rank(amount, tiesMethod='average', percent=true) - 0.5) * abs(winsorize(PctChg/20, 0.05))",
}
cols = ", ".join([
f"{factor_def[factor_name]} as {factor_name}" \
for factor_name in factor_list
])
sql = f"""
select code, m_nDate, {cols}
from loadTable("dfs://daily_stock_ts", "daily_kline")
context by m_nDate
"""
print("factor sql for trend with amount: " + sql)
df = self.ddb_sess.run(sql)
df.set_index(["code", "m_nDate"], inplace=True)
return df
def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# 日频行情数据
loader = DDBDailyLoader()
#loader = DDBDailyLoader()
loader = DDBDailyFactorLoader()
loader.load_ddb_database()
#loader.dump_daily_kline_to_ddb()
loader.dump_to_ddb()
if __name__ == '__main__':

@ -0,0 +1,12 @@
from .DDBLoader import DDBLoader
class DDBEntityLoader(DDBLoader):
ddb_path = "dfs://daily_stock_ts"
ddb_dbname = "db_daily_stock_ts"
def __init__(self, dtype, **kwargs):
pass

@ -275,20 +275,6 @@ class DDBHFTLoader(DDBLoader):
print('Did create the persistent table with the memory table')
def make_calendar_df(self):
print('Will create calendar dataframe from SQL Server')
# 从KLine表查询主要是因为KLine表最小
with self.mssql_engine.connect() as conn:
stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine"
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
df_calendar['m_nDate'] = self.make_date(df_calendar['m_nDate'])
print('Did make the DataFrame for calendar')
return df_calendar
def _make_table_skeleton(self, hft_type_name, table_capacity=default_table_capacity):
def _make_tbl_config(field_list):

@ -0,0 +1,286 @@
import pickle
import functools
import warnings
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np
import pandas as pd
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
from .DDBLoader import DDBLoader
class DDBIndexLoader(DDBLoader):
ddb_path = "dfs://daily_stock_ts"
ddb_dbname = "db_daily_stock_ts"
def __init__(self, dtype, **kwargs):
# TODO: 后续版本中,父类的构造函数里可能会增加一些设置项
super().__init__(**kwargs)
self.dtype = dtype
if dtype == "concept":
self.mem_tbl_name = "mem_idx_daily_concept"
self.part_tbl_name ="idx_daily_concept"
elif dtype == "kline":
self.mem_tbl_name = "mem_idx_daily_kline"
self.part_tbl_name ="idx_daily_kline"
else:
raise NotImplementedError(f"Unsupported `dtype` argument: {dtype}")
self.make_fields()
self.make_calendar_df()
def make_fields(self):
if self.dtype == "concept":
with self.mssql_engine.connect() as conn:
rs = conn.execute("select IndexID from [IndexInfo].[dbo].[Constituents] group by IndexID")
self.fields = [index_id for (index_id,) in rs.fetchall()]
elif self.dtype == "kline":
self.fields = ['open', 'high', 'low', 'close', 'vol', 'amount', 'yclose']
def make_calendar_df(self):
# 这里我们使用天软日K先数据表来构造交易日历
with self.mssql_engine.connect() as conn:
if self.dtype == "concept":
stat = "select [StockID], [date] from [StockDaily].[dbo].[DailyKLine] group by [StockID], [date]"
elif self.dtype == "kline":
stat = "select [StockID], [date] from [IndexDaily].[dbo].[DailyKLine] group by [StockID], [date]"
else:
raise NotImplementedError(f"Unsupported dtype: {self.dtype}")
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
self.df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
self.df_calendar['m_nDate'] = self.make_date(self.df_calendar['m_nDate'])
self.df_calendar['code'] = self.tscode_to_windcode(self.df_calendar['code'])
print('Did make the DataFrame for calendar')
print(self.df_calendar.head())
def load_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(directory='{dbPath}')
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
print('Did load database from', self.ddb_path)
def create_ddb_partition_table(self):
if self.dtype == "concept":
self._create_ddb_memory_table_concept()
elif self.dtype == "kline":
self._create_ddb_memory_table_kline()
pprint(f"Did create memory table: {self.mem_tbl_name}")
#res = self.ddb_sess.run(f"schema({mem_tbl_name}).colDefs")
if self.ddb_sess.existsTable(self.ddb_path, self.part_tbl_name):
pprint(f"Will drop partition table: {self.part_tbl_name}")
self.ddb_sess.dropTable(self.ddb_path, self.part_tbl_name)
self.ddb_sess.run("""
{part_tbl_name} = {ddb_dbname}.createPartitionedTable(
table = {mem_tbl_name},
tableName = `{part_tbl_name},
partitionColumns = `code,
sortColumns = `code`m_nDate,
compressMethods = {{m_nDate:"delta"}}
)
""".format(
ddb_dbname = self.ddb_dbname,
part_tbl_name = self.part_tbl_name,
mem_tbl_name = self.mem_tbl_name
))
def _create_ddb_memory_table_concept(self):
concept_list = self.fields
col_name_list = ['code', 'm_nDate'] + concept_list
col_type_list = ['SYMBOL', 'DATE'] + ['BOOL'] * len(concept_list)
code = """
{mem_tbl_name} = table(
{capacity}:0,
{col_names},
[{col_types}]
);
""".format(
mem_tbl_name = self.mem_tbl_name,
capacity = 10,
col_names = '`' + '`'.join(col_name_list),
col_types = ','.join(col_type_list)
)
pprint(f"Will create mem table by:\n{code}")
self.ddb_sess.run(code)
def _create_ddb_memory_table_kline(self):
col_name_list = ['code', 'm_nDate'] + self.fields
col_type_list = ['SYMBOL', 'DATE'] + ['DOUBLE'] * len(self.fields)
code = """
{mem_tbl_name} = table(
{capacity} : 0,
{col_names},
[{col_types}]
);
""".format(
mem_tbl_name = self.mem_tbl_name,
capacity = 10,
col_names = '`' + '`'.join(col_name_list),
col_types = ','.join(col_type_list)
)
pprint(f"Will create mem table by:\n{code}")
self.ddb_sess.run(code)
def _make_idx_daily_kline(self):
with tqdm(self.df_calendar.groupby('code')) as pbar:
for wind_code, df_calendar_stock in pbar:
pbar.set_description(f"Will work on {wind_code}")
# 生成ts-code用于查询Sql-Server中天软的概念板块指数
ts_code = wind_code[-2:] + wind_code[:-3]
df_calendar_stock.set_index(['code', 'm_nDate'], inplace=True)
with self.mssql_engine.connect() as conn:
code = """
select
{field_list}
from
[IndexDaily].[dbo].[DailyKLine]
where
StockID='{index_id}'
""".format(
field_list = ','.join([f"[{field}]" for field in (['StockID', 'date'] + self.fields)]),
index_id = ts_code
)
rs = conn.execute(code)
row_list = rs.fetchall()
df = pd.DataFrame(row_list, columns=['code', 'm_nDate'] + self.fields)
df['code'] = self.tscode_to_windcode(df['code'])
df['m_nDate'] = self.make_date(df['m_nDate'])
df.set_index(['code', 'm_nDate'], inplace=True)
yield wind_code, df
def dump_idx_daily_kline_to_ddb(self):
for idx_id, df in self._make_idx_daily_kline():
df.reset_index(inplace=True)
#pprint(f"Will append to partiton table: \n{df}")
self.ddb_sess.upload({self.mem_tbl_name : df})
self.ddb_sess.run("""
append!(loadTable('{dbPath}', `{part_tbl_name}), {mem_tbl_name})
""".format(
dbPath = self.ddb_path,
part_tbl_name = self.part_tbl_name,
mem_tbl_name = self.mem_tbl_name
))
@staticmethod
def _mark_stock2concept_onehot(df_stock2concept, concept_id, start_date, end_date):
# 个股成为某个概念(指数)的起始日期是必定会提供的
# 但是截止日期可能缺失,确实一般意味着当前仍然是在此概念板块中
# 因此会通过将日期填充至最后一日来表示当前仍然在此概念板块内
if end_date is None or end_date == 0:
start_date = pd.to_datetime(str(start_date), format='%Y%m%d')
df_stock2concept.loc[df_stock2concept.index.get_level_values('m_nDate') >= start_date] = True
else:
start_date = pd.to_datetime(str(start_date), format='%Y%m%d')
end_date = pd.to_datetime(str(end_date), format='%Y%m%d')
df_stock2concept.loc[
(df_stock2concept.index.get_level_values('m_nDate') >= start_date) &
(df_stock2concept.index.get_level_values('m_nDate') <= end_date)
] = True
def _make_stock2concept_onehot(self):
# 从calendar中截取出与当前stock有关的日期然后设置成index
# 此处calendar使用的是海通高频数据构建因此股票代码为WIND-CODE
# 对calendar根据股票代码进行分组
with tqdm(self.df_calendar.groupby('code')) as pbar:
for wind_code, df_calendar_stock in pbar:
pbar.set_description(f"Will work on {wind_code}")
# 生成ts-code用于查询Sql-Server中天软的概念板块指数
ts_code = wind_code[-2:] + wind_code[:-3]
df_calendar_stock.set_index(['code', 'm_nDate'], inplace=True)
# 纵表转横表,`concept_list`作为列名
df_stock2concept = pd.DataFrame(
False, # one-hot横表初始化都是0后续根据Sql-Server的进出日期标注1
index=df_calendar_stock.index,
columns=concept_list,
dtype="bool"
)
# 从Sql-Server中读取`stock_id`所对应的概念板块进出日期
# 此数据是从天软指数数据中提取因此需要使用TSCODE
with self.mssql_engine.connect() as conn:
code = """
select
SecId, IndexID, EnterDate, ExitDate
from
[IndexInfo].[dbo].[Constituents]
where
SecID='{stock_id}'
""".format(
stock_id = ts_code
)
rs = conn.execute(code)
row_list = rs.fetchall()
# 从Sql-Server读取出单个股票的所有板块进出日期后开始进行标记
for (stock_id, concept_id, start_date, end_date) in row_list:
# mark the one-hot position one-by-one
self._mark_stock2concept_onehot(
df_stock2concept[concept_id],
concept_id,
start_date, end_date
)
# yield the marked one-hot dataframe for one stock
yield wind_code, df_stock2concept
def dump_idx_concept_to_ddb(self):
concept = self.fields
for stock_id, df in self._make_stock2concept_onehot(concept_list):
df.reset_index(inplace=True)
self.ddb_sess.upload({self.mem_tbl_name : df})
self.ddb_sess.run("""
append!(loadTable('{dbPath}', `{part_tbl_name}), {mem_tbl_name})
""".format(
dbPath = self.ddb_path,
part_tbl_name = self.part_tbl_name,
mem_tbl_name = self.mem_tbl_name
))

@ -0,0 +1,52 @@
from DDBIndexLoader import DDBIndexLoader
class DDBIndexLoaderWind(DDBIndexLoader):
def __init__(self, dtype, **kwargs):
# TODO: 后续版本中,父类的构造函数里可能会增加一些设置项
super().__init__(**kwargs)
self.dtype = dtype
if dtype == "concept":
self.mem_tbl_name = "mem_idx_daily_concept_wind"
self.part_tbl_name ="idx_daily_concept_wind"
elif dtype == "kline":
self.mem_tbl_name = "mem_idx_daily_kline_wind"
self.part_tbl_name ="idx_daily_kline_wind"
else:
raise NotImplementedError(f"Unsupported `dtype` argument: {dtype}")
self.make_fields()
self.make_calendar_df()
def make_fields(self):
if self.dtype == "concept":
with self.mssql_engine.connect() as conn:
rs = conn.execute("select [WIND_SEC_CODE] from [IndexInfo].[dbo].[Constituents] group by IndexID")
self.fields = [index_id for (index_id,) in rs.fetchall()]
elif self.dtype == "kline":
self.fields = ['open', 'high', 'low', 'close', 'vol', 'amount', 'yclose']
def make_calendar_df(self):
# 这里我们使用天软日K先数据表来构造交易日历
with self.mssql_engine.connect() as conn:
if self.dtype == "concept":
stat = "select [StockID], [date] from [StockDaily].[dbo].[DailyKLine] group by [StockID], [date]"
elif self.dtype == "kline":
stat = "select [StockID], [date] from [IndexDaily].[dbo].[DailyKLine] group by [StockID], [date]"
else:
raise NotImplementedError(f"Unsupported dtype: {self.dtype}")
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
self.df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
self.df_calendar['m_nDate'] = self.make_date(self.df_calendar['m_nDate'])
self.df_calendar['code'] = self.tscode_to_windcode(self.df_calendar['code'])
print('Did make the DataFrame for calendar')
print(self.df_calendar.head())

@ -34,8 +34,8 @@ class DDBLoader(DDBBase):
'password' : 'passw0rd!'
}
def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.mssql_engine = sa.create_engine(
"mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
connect_args = {
@ -96,5 +96,16 @@ class DDBLoader(DDBBase):
return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms')
def make_calendar_df(self):
print('Will create calendar dataframe from SQL Server')
# 从KLine表查询主要是因为KLine表最小
with self.mssql_engine.connect() as conn:
stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine"
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
df_calendar['m_nDate'] = self.make_date(df_calendar['m_nDate'])
print('Did make the DataFrame for calendar')
return df_calendar

@ -2,6 +2,21 @@
from loader.DDBPITLoader import DDBPITLoader
from loader.DDBHFTLoader import DDBHFTLoader
from loader.DDBBasicInfoLoader import DDBBasicInfoLoader
from loader.DDBIndexLoader import DDBIndexLoader
from loader.DDBDailyLoader import DDBDailyLoader, DDBDailyFactorLoader
def create_index_data():
# 板块指数数据
loader = DDBIndexLoader(dtype="kline", host='localhost')
loader.load_ddb_database()
#mem_tbl_name, part_tbl_name, fields = loader.create_ddb_partition_table("concept")
#loader.dump_idx_concept_to_ddb(mem_tbl_name, part_tbl_name, concept_list)
# 指数日K线数据
loader.create_ddb_partition_table()
loader.dump_idx_daily_kline_to_ddb()
def create_hft_data():
@ -37,7 +52,13 @@ def create_daily_kline_data():
# 日频行情数据
loader = DDBDailyLoader()
loader.load_ddb_database()
loader.dump_daily_kline_to_ddb()
loader.dump_to_ddb()
def create_daily_factor_data():
loader = DDBDailyFactorLoader(host="localhost")
loader.load_ddb_database()
loader.dump_to_ddb()
@ -45,8 +66,9 @@ def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
create_hft_data()
create_daily_factor_data()
#create_index_data()
#create_hft_data()
#create_pit_data()

Loading…
Cancel
Save