1. Re-organize the directory structure, creating sub-packages for `DDBLoader`.

2. Update `DDBHFTLoader` to better handle the journal file.
3. Move the entrance scripts to `src/run.py`.
main
Guofu Li 2 years ago
parent 84e89f70aa
commit 0ca31dcd47

@ -12,5 +12,5 @@ Open,/Users/guofu/Workspaces/dolphindb/test2/scripts/pit_series_report_period_at
Server,.167,192.168.1.167,8848,,admin,123456 Server,.167,192.168.1.167,8848,,admin,123456
Server,.7,192.168.1.7,8848,,admin,123456 Server,.7,192.168.1.7,8848,,admin,123456
Server,local8848,localhost,8848,,, Server,local8848,localhost,8848,,,
ActiveFile,/Users/guofu/Workspaces/dolphindb/test2/scripts/pit6.dos ActiveFile,/Users/guofu/Workspaces/dolphindb/test2/scripts/test2.dos
ActiveServer,.167 ActiveServer,.7

@ -48,25 +48,130 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 68, "execution_count": 32,
"id": "8b7dae3d-aef1-4c50-92b2-460d4fea0a96", "id": "c8d07fc8-d80c-490f-9220-0d3e8e4c72a4",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"114350" "50"
] ]
}, },
"execution_count": 68, "execution_count": 32,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
], ],
"source": [ "source": [
"sess = ddb.session('localhost', 8848)\n",
"sess.login('admin', '123456')\n",
"\n",
"# backup(backup_path, sql_obj, force, parallel)\n",
"code = \"\"\"\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"bs_common_ori\")>, true, false);\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"bs_common_adj\")>, true, false);\n",
" \n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"cfs_common_ori\")>, true, false);\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"cfs_common_adj\")>, true, false);\n",
" \n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"is_common_adj\")>, true, false);\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"is_common_adj\")>, true, false);\n",
" \n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"divident\")>, true, false);\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"earnings_preannouncement\")>, true, false);\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://pit_stock_ts\", \"preliminary_earnings_estimate\")>, true, false);\n",
"\"\"\"\n",
"\n",
"sess.run(code)" "sess.run(code)"
] ]
}, },
{
"cell_type": "code",
"execution_count": 33,
"id": "8b7dae3d-aef1-4c50-92b2-460d4fea0a96",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>tableName</th>\n",
" <th>physicalIndex</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>daily_kline</td>\n",
" <td>uoH</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>hft_daily_factor</td>\n",
" <td>u6J</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" tableName physicalIndex\n",
"0 daily_kline uoH\n",
"1 hft_daily_factor u6J"
]
},
"execution_count": 33,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
" listTables(\"dfs://daily_stock_ts\");\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 34,
"id": "8af3381d-9fdc-4cb0-b3a9-d6538db97476",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"50"
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
" backup('/data/dolphindb/backup/', <select * from loadTable(\"dfs://daily_stock_ts\", \"daily_kline\") >, true, false);\n",
"\"\"\")"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 69, "execution_count": 69,
@ -1113,7 +1218,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 27, "execution_count": 29,
"id": "d68ea326-82c3-4a7c-97cf-c04dd8aee56b", "id": "d68ea326-82c3-4a7c-97cf-c04dd8aee56b",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@ -1144,18 +1249,18 @@
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>0</th>\n", " <th>0</th>\n",
" <td>0</td>\n", " <td>1468390810</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" sum_cnt\n", " sum_cnt\n",
"0 0" "0 1468390810"
] ]
}, },
"execution_count": 27, "execution_count": 29,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -1171,32 +1276,327 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 28, "execution_count": 38,
"id": "7cbc4906-7756-424a-9ce5-9d2b6d1bab4b", "id": "7cbc4906-7756-424a-9ce5-9d2b6d1bab4b",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"ename": "RuntimeError", "data": {
"evalue": "<Exception> in run: Server response: 'tbl = loadTable(\"dfs://hft_stock_ts\", \"TickPartitioned\") => getFileBlocksMeta on path '/hft_stock_ts/TickPartitioned.tbl' failed, reason: path does not exist' script: '\ntbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\nselect sum(cnt) from (select count(*) as cnt from tbl map);\n'", "text/html": [
"output_type": "error", "<div>\n",
"traceback": [ "<style scoped>\n",
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", " .dataframe tbody tr th:only-of-type {\n",
"\u001b[0;31mRuntimeError\u001b[0m Traceback (most recent call last)", " vertical-align: middle;\n",
"Input \u001b[0;32mIn [28]\u001b[0m, in \u001b[0;36m<cell line: 3>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m sess \u001b[38;5;241m=\u001b[39m ddb\u001b[38;5;241m.\u001b[39msession(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m192.168.1.167\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;241m8848\u001b[39m)\n\u001b[1;32m 2\u001b[0m sess\u001b[38;5;241m.\u001b[39mlogin(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124madmin\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124m123456\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[0;32m----> 3\u001b[0m \u001b[43msess\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\"\"\u001b[39;49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;124;43mtbl = loadTable(\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mdfs://hft_stock_ts\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43m, \u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mTickPartitioned\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m);\u001b[39;49m\n\u001b[1;32m 5\u001b[0m \u001b[38;5;124;43mselect sum(cnt) from (select count(*) as cnt from tbl map);\u001b[39;49m\n\u001b[1;32m 6\u001b[0m \u001b[38;5;124;43m\"\"\"\u001b[39;49m\u001b[43m)\u001b[49m\n", " }\n",
"File \u001b[0;32m~/.venv/tinysoft/lib/python3.8/site-packages/dolphindb/session.py:161\u001b[0m, in \u001b[0;36msession.run\u001b[0;34m(self, script, *args, **kwargs)\u001b[0m\n\u001b[1;32m 159\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mfetchSize\u001b[39m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;129;01min\u001b[39;00m kwargs\u001b[38;5;241m.\u001b[39mkeys():\n\u001b[1;32m 160\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m BlockReader(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcpp\u001b[38;5;241m.\u001b[39mrunBlock(script, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs))\n\u001b[0;32m--> 161\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcpp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun\u001b[49m\u001b[43m(\u001b[49m\u001b[43mscript\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n", "\n",
"\u001b[0;31mRuntimeError\u001b[0m: <Exception> in run: Server response: 'tbl = loadTable(\"dfs://hft_stock_ts\", \"TickPartitioned\") => getFileBlocksMeta on path '/hft_stock_ts/TickPartitioned.tbl' failed, reason: path does not exist' script: '\ntbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\nselect sum(cnt) from (select count(*) as cnt from tbl map);\n'" " .dataframe tbody tr th {\n",
] " vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>sum_cnt</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>4991926704</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" sum_cnt\n",
"0 4991926704"
]
},
"execution_count": 38,
"metadata": {},
"output_type": "execute_result"
} }
], ],
"source": [ "source": [
"sess = ddb.session('192.168.1.167', 8848)\n", "sess = ddb.session('192.168.1.7', 8848)\n",
"sess.login('admin', '123456')\n", "sess.login('admin', '123456')\n",
"sess.run(\"\"\"\n", "sess.run(\"\"\"\n",
"tbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\n", " tbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\n",
"select sum(cnt) from (select count(*) as cnt from tbl map);\n", " select sum(cnt) from (select count(*) as cnt from tbl map);\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "e9ab5e57-dce5-4426-9bac-4238cd067197",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>sum_cnt</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>9388749</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" sum_cnt\n",
"0 9388749"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
" select sum(cnt) from (select count(*) as cnt from tbl where code='002182.SZ' map);\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 43,
"id": "4ba45027-bbb5-4b27-99da-3452cc8d2f1c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2298</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" count\n",
"0 2298"
]
},
"execution_count": 43,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
" select count(*) from (\n",
" select code, m_nDate, count(*) as cnt from tbl where code='002182.SZ' group by code, m_nDate map\n",
" );\n",
"\"\"\")" "\"\"\")"
] ]
}, },
{
"cell_type": "code",
"execution_count": 57,
"id": "29ab8af5-e571-4064-b691-a186d9fb4d08",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" </tr>\n",
" <tr>\n",
" <th>m_nDate</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>2013-01-04</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2013-01-07</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2013-01-08</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2013-01-09</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2013-01-10</th>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2022-07-04</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2022-07-05</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2022-07-06</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2022-07-07</th>\n",
" </tr>\n",
" <tr>\n",
" <th>2022-07-08</th>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>2298 rows × 0 columns</p>\n",
"</div>"
],
"text/plain": [
"Empty DataFrame\n",
"Columns: []\n",
"Index: [2013-01-04 00:00:00, 2013-01-07 00:00:00, 2013-01-08 00:00:00, 2013-01-09 00:00:00, 2013-01-10 00:00:00, 2013-01-11 00:00:00, 2013-01-14 00:00:00, 2013-01-15 00:00:00, 2013-01-16 00:00:00, 2013-01-17 00:00:00, 2013-01-18 00:00:00, 2013-01-21 00:00:00, 2013-01-22 00:00:00, 2013-01-23 00:00:00, 2013-01-24 00:00:00, 2013-01-25 00:00:00, 2013-01-28 00:00:00, 2013-01-29 00:00:00, 2013-01-30 00:00:00, 2013-01-31 00:00:00, 2013-02-01 00:00:00, 2013-02-04 00:00:00, 2013-02-05 00:00:00, 2013-02-06 00:00:00, 2013-02-07 00:00:00, 2013-02-08 00:00:00, 2013-02-18 00:00:00, 2013-02-19 00:00:00, 2013-02-20 00:00:00, 2013-02-21 00:00:00, 2013-02-22 00:00:00, 2013-02-25 00:00:00, 2013-02-26 00:00:00, 2013-02-27 00:00:00, 2013-02-28 00:00:00, 2013-03-01 00:00:00, 2013-03-04 00:00:00, 2013-03-05 00:00:00, 2013-03-06 00:00:00, 2013-03-07 00:00:00, 2013-03-08 00:00:00, 2013-03-11 00:00:00, 2013-03-12 00:00:00, 2013-03-13 00:00:00, 2013-03-14 00:00:00, 2013-03-15 00:00:00, 2013-03-18 00:00:00, 2013-03-19 00:00:00, 2013-03-20 00:00:00, 2013-03-21 00:00:00, 2013-03-22 00:00:00, 2013-03-25 00:00:00, 2013-03-26 00:00:00, 2013-03-27 00:00:00, 2013-03-28 00:00:00, 2013-03-29 00:00:00, 2013-04-01 00:00:00, 2013-04-02 00:00:00, 2013-04-03 00:00:00, 2013-04-08 00:00:00, 2013-04-09 00:00:00, 2013-04-10 00:00:00, 2013-04-11 00:00:00, 2013-04-12 00:00:00, 2013-04-15 00:00:00, 2013-04-16 00:00:00, 2013-04-17 00:00:00, 2013-04-18 00:00:00, 2013-04-19 00:00:00, 2013-04-22 00:00:00, 2013-04-23 00:00:00, 2013-04-24 00:00:00, 2013-04-25 00:00:00, 2013-04-26 00:00:00, 2013-05-02 00:00:00, 2013-05-03 00:00:00, 2013-05-06 00:00:00, 2013-05-07 00:00:00, 2013-05-08 00:00:00, 2013-05-09 00:00:00, 2013-05-10 00:00:00, 2013-05-13 00:00:00, 2013-05-14 00:00:00, 2013-05-15 00:00:00, 2013-05-16 00:00:00, 2013-05-17 00:00:00, 2013-05-20 00:00:00, 2013-05-21 00:00:00, 2013-05-22 00:00:00, 2013-05-23 00:00:00, 2013-05-24 00:00:00, 2013-05-27 00:00:00, 2013-05-28 00:00:00, 2013-05-29 00:00:00, 2013-05-30 00:00:00, 2013-05-31 00:00:00, 2013-06-03 00:00:00, 2013-06-04 00:00:00, 2013-06-05 00:00:00, 2013-06-06 00:00:00, ...]\n",
"\n",
"[2298 rows x 0 columns]"
]
},
"execution_count": 57,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = sess.run(\"\"\"\n",
" select m_nDate from tbl where code='002182.SZ' group by m_nDate map;\n",
"\"\"\").set_index('m_nDate')\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": 51,
"id": "013d191b-2a41-4d7b-8382-b6ee8764c2d0",
"metadata": {},
"outputs": [],
"source": [
"import sqlalchemy as sa\n",
"engine = sa.create_engine(\n",
" 'mssql+pyodbc://sa:passw0rd!@192.168.1.7/master?driver=ODBC+Driver+18+for+SQL+Server',\n",
" connect_args = {\n",
" \"TrustServerCertificate\": \"yes\"\n",
" }, echo=False)"
]
},
{
"cell_type": "code",
"execution_count": 54,
"id": "c90e8e9d-cde0-4a7a-afd3-ea771483f001",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'20130104'"
]
},
"execution_count": 54,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"with engine.connect() as conn:\n",
" rows = conn.execute(\"select * from [Level2BytesTick].dbo.[Tick] where S_INFO_WINDCODE='002182.SZ'\").fetchall()\n",
" rows = [row for row in rows]\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 58,
"id": "525dfcf1-720a-4b65-90b5-28d0c2118cf5",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 58,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"dt = pd.to_datetime(rows[0][1], format='%Y%m%d')\n",
"dt in df.index"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 31, "execution_count": 31,
@ -1231,14 +1631,221 @@
"\t} while (del_date <= 2022.12.31)\n", "\t} while (del_date <= 2022.12.31)\n",
"}\n", "}\n",
"\"\"\"\n", "\"\"\"\n",
"sess.run(code)" "# 这段代码运行好赶紧注释掉,非常危险\n",
"# sess.run(code)"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 37,
"id": "ea0501e7-d416-45ce-add5-e443c55f158c", "id": "ea0501e7-d416-45ce-add5-e443c55f158c",
"metadata": {}, "metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>code</th>\n",
" <th>industry_code</th>\n",
" <th>industry_name</th>\n",
" <th>industry_level</th>\n",
" <th>enter_date</th>\n",
" <th>exit_date</th>\n",
" <th>is_new</th>\n",
" <th>industry_class</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>000017.SZ</td>\n",
" <td>SWHY310000</td>\n",
" <td>申万交运设备</td>\n",
" <td>1</td>\n",
" <td>1992-03-31</td>\n",
" <td>2011-10-10</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>000017.SZ</td>\n",
" <td>SWHY310300</td>\n",
" <td>申万非汽车交运设备</td>\n",
" <td>2</td>\n",
" <td>1992-03-31</td>\n",
" <td>2011-10-10</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>000017.SZ</td>\n",
" <td>SWHY310301</td>\n",
" <td>申万摩托车</td>\n",
" <td>3</td>\n",
" <td>1992-03-31</td>\n",
" <td>2011-10-10</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>000017.SZ</td>\n",
" <td>SWHY310000</td>\n",
" <td>申万交运设备</td>\n",
" <td>1</td>\n",
" <td>2011-10-10</td>\n",
" <td>2014-02-21</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>000017.SZ</td>\n",
" <td>SWHY310300</td>\n",
" <td>申万非汽车交运设备</td>\n",
" <td>2</td>\n",
" <td>2011-10-10</td>\n",
" <td>2014-02-21</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>95</th>\n",
" <td>000157.SZ</td>\n",
" <td>SWHY260202</td>\n",
" <td>申万工程机械</td>\n",
" <td>3</td>\n",
" <td>2000-10-12</td>\n",
" <td>2014-02-21</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>96</th>\n",
" <td>000157.SZ</td>\n",
" <td>SWHY640000</td>\n",
" <td>申万机械设备</td>\n",
" <td>1</td>\n",
" <td>2014-02-21</td>\n",
" <td>2021-12-13</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>97</th>\n",
" <td>000157.SZ</td>\n",
" <td>SWHY640200</td>\n",
" <td>申万专用设备</td>\n",
" <td>2</td>\n",
" <td>2014-02-21</td>\n",
" <td>2021-12-13</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>98</th>\n",
" <td>000157.SZ</td>\n",
" <td>SWHY640201</td>\n",
" <td>申万工程机械</td>\n",
" <td>3</td>\n",
" <td>2014-02-21</td>\n",
" <td>2021-12-13</td>\n",
" <td>0</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" <tr>\n",
" <th>99</th>\n",
" <td>000157.SZ</td>\n",
" <td>SWHY640000</td>\n",
" <td>申万机械设备</td>\n",
" <td>1</td>\n",
" <td>2021-12-13</td>\n",
" <td>NaT</td>\n",
" <td>1</td>\n",
" <td>SWHY</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>100 rows × 8 columns</p>\n",
"</div>"
],
"text/plain": [
" code industry_code industry_name industry_level enter_date \\\n",
"0 000017.SZ SWHY310000 申万交运设备 1 1992-03-31 \n",
"1 000017.SZ SWHY310300 申万非汽车交运设备 2 1992-03-31 \n",
"2 000017.SZ SWHY310301 申万摩托车 3 1992-03-31 \n",
"3 000017.SZ SWHY310000 申万交运设备 1 2011-10-10 \n",
"4 000017.SZ SWHY310300 申万非汽车交运设备 2 2011-10-10 \n",
".. ... ... ... ... ... \n",
"95 000157.SZ SWHY260202 申万工程机械 3 2000-10-12 \n",
"96 000157.SZ SWHY640000 申万机械设备 1 2014-02-21 \n",
"97 000157.SZ SWHY640200 申万专用设备 2 2014-02-21 \n",
"98 000157.SZ SWHY640201 申万工程机械 3 2014-02-21 \n",
"99 000157.SZ SWHY640000 申万机械设备 1 2021-12-13 \n",
"\n",
" exit_date is_new industry_class \n",
"0 2011-10-10 0 SWHY \n",
"1 2011-10-10 0 SWHY \n",
"2 2011-10-10 0 SWHY \n",
"3 2014-02-21 0 SWHY \n",
"4 2014-02-21 0 SWHY \n",
".. ... ... ... \n",
"95 2014-02-21 0 SWHY \n",
"96 2021-12-13 0 SWHY \n",
"97 2021-12-13 0 SWHY \n",
"98 2021-12-13 0 SWHY \n",
"99 NaT 1 SWHY \n",
"\n",
"[100 rows x 8 columns]"
]
},
"execution_count": 37,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
"tbl = loadTable(\"dfs://info_stock_ts\", 'info_industry');\n",
"select top 100 * from tbl map;\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "16dd860d-049f-420e-8d04-d1f8969e5ed1",
"metadata": {},
"outputs": [], "outputs": [],
"source": [] "source": []
} }

@ -0,0 +1,333 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "2b46ea32-3e2e-466b-ac6a-9874336551af",
"metadata": {},
"outputs": [],
"source": [
"import dolphindb as ddb\n",
"\n",
"sess = ddb.session('localhost', 8848)\n",
"sess.login('admin', '123456')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "6cc64a77-a5ad-46b6-8204-b188ee308ad3",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'partitionType': 5,\n",
" 'partitionColumnType': 17,\n",
" 'partitionColumnIndex': 0,\n",
" 'chunkPath': None,\n",
" 'colDefs': name typeString typeInt comment\n",
" 0 code SYMBOL 17 \n",
" 1 report_period DATE 6 \n",
" 2 appear_at_date DATE 6 \n",
" 3 S_PROFITNOTICE_STYLE STRING 18 \n",
" 4 S_PROFITNOTICE_CHANGEMIN DOUBLE 16 \n",
" 5 S_PROFITNOTICE_CHANGEMAX DOUBLE 16 \n",
" 6 S_PROFITNOTICE_NETPROFITMIN DOUBLE 16 \n",
" 7 S_PROFITNOTICE_NETPROFITMAX DOUBLE 16 \n",
" 8 S_PROFITNOTICE_REASON STRING 18 ,\n",
" 'chunkGranularity': 'TABLE',\n",
" 'partitionTypeName': 'HASH',\n",
" 'keepDuplicates': 'ALL',\n",
" 'engineType': 'TSDB',\n",
" 'partitionColumnName': 'code',\n",
" 'partitionSchema': 50,\n",
" 'sortColumns': array(['code', 'report_period', 'appear_at_date'], dtype=object),\n",
" 'partitionSites': None}"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"tbl_ep = loadTable('dfs://pit_stock_ts', 'earnings_preannouncement')\")\n",
"sess.run(\"schema(tbl_ep)\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "39c05247-deac-4c49-a20d-e19e637c6081",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>code</th>\n",
" <th>report_period</th>\n",
" <th>cnt</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>600543.SH</td>\n",
" <td>2021-12-31</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>605499.SH</td>\n",
" <td>2021-06-30</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>603729.SH</td>\n",
" <td>2020-12-31</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>002058.SZ</td>\n",
" <td>2020-12-31</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>601599.SH</td>\n",
" <td>2020-12-31</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>295</th>\n",
" <td>600813.SH</td>\n",
" <td>2002-06-30</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>296</th>\n",
" <td>000689.SZ</td>\n",
" <td>2002-06-30</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>297</th>\n",
" <td>000653.SZ</td>\n",
" <td>2002-06-30</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>298</th>\n",
" <td>000658.SZ</td>\n",
" <td>2002-06-30</td>\n",
" <td>7</td>\n",
" </tr>\n",
" <tr>\n",
" <th>299</th>\n",
" <td>600853.SH</td>\n",
" <td>2001-12-31</td>\n",
" <td>3</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>300 rows × 3 columns</p>\n",
"</div>"
],
"text/plain": [
" code report_period cnt\n",
"0 600543.SH 2021-12-31 3\n",
"1 605499.SH 2021-06-30 3\n",
"2 603729.SH 2020-12-31 3\n",
"3 002058.SZ 2020-12-31 3\n",
"4 601599.SH 2020-12-31 3\n",
".. ... ... ...\n",
"295 600813.SH 2002-06-30 4\n",
"296 000689.SZ 2002-06-30 5\n",
"297 000653.SZ 2002-06-30 4\n",
"298 000658.SZ 2002-06-30 7\n",
"299 600853.SH 2001-12-31 3\n",
"\n",
"[300 rows x 3 columns]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
" select code, report_period, count(*) as cnt from tbl_ep group by code, report_period having count(*) > 2 order by report_period desc \n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "74623941-5f6e-48fc-b2a0-7050ae6c6045",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>code</th>\n",
" <th>report_period</th>\n",
" <th>appear_at_date</th>\n",
" <th>S_PROFITNOTICE_STYLE</th>\n",
" <th>S_PROFITNOTICE_CHANGEMIN</th>\n",
" <th>S_PROFITNOTICE_CHANGEMAX</th>\n",
" <th>S_PROFITNOTICE_NETPROFITMIN</th>\n",
" <th>S_PROFITNOTICE_NETPROFITMAX</th>\n",
" <th>S_PROFITNOTICE_REASON</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>600543.SH</td>\n",
" <td>2021-12-31</td>\n",
" <td>2021-10-30</td>\n",
" <td>预亏</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>1-9月公司累计营业收入增长39.61%但较2019年同期仍有明显下滑目前处于亏损状态...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>600543.SH</td>\n",
" <td>2021-12-31</td>\n",
" <td>2022-01-29</td>\n",
" <td>预亏</td>\n",
" <td>-4939.0</td>\n",
" <td>-4939.0</td>\n",
" <td>-9000.0</td>\n",
" <td>-9000.0</td>\n",
" <td>1、主营业务影响。受新冠肺炎疫情等因素影响消费市场低迷流通环节受限、下游需求收缩公司葡...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>600543.SH</td>\n",
" <td>2021-12-31</td>\n",
" <td>2022-03-31</td>\n",
" <td>预亏</td>\n",
" <td>-5408.0</td>\n",
" <td>-5408.0</td>\n",
" <td>-9872.0</td>\n",
" <td>-9872.0</td>\n",
" <td>公司财务部门将2021年度计提的固定资产减值准备4830万元作非经常性损益处理在审计时...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" code report_period appear_at_date S_PROFITNOTICE_STYLE \\\n",
"0 600543.SH 2021-12-31 2021-10-30 预亏 \n",
"1 600543.SH 2021-12-31 2022-01-29 预亏 \n",
"2 600543.SH 2021-12-31 2022-03-31 预亏 \n",
"\n",
" S_PROFITNOTICE_CHANGEMIN S_PROFITNOTICE_CHANGEMAX \\\n",
"0 0.0 0.0 \n",
"1 -4939.0 -4939.0 \n",
"2 -5408.0 -5408.0 \n",
"\n",
" S_PROFITNOTICE_NETPROFITMIN S_PROFITNOTICE_NETPROFITMAX \\\n",
"0 0.0 0.0 \n",
"1 -9000.0 -9000.0 \n",
"2 -9872.0 -9872.0 \n",
"\n",
" S_PROFITNOTICE_REASON \n",
"0 1-9月公司累计营业收入增长39.61%但较2019年同期仍有明显下滑目前处于亏损状态... \n",
"1 1、主营业务影响。受新冠肺炎疫情等因素影响消费市场低迷流通环节受限、下游需求收缩公司葡... \n",
"2 公司财务部门将2021年度计提的固定资产减值准备4830万元作非经常性损益处理在审计时... "
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sess.run(\"\"\"\n",
"select * from tbl_ep where code='600543.SH' and report_period=2021.12.31\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1425df79-bfaf-414a-9a12-f3d7e1231e5e",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 28, "execution_count": 2,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -330,20 +330,20 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 53, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"(4708,)\n" "(2298,)\n"
] ]
} }
], ],
"source": [ "source": [
"with engine.connect() as conn:\n", "with engine.connect() as conn:\n",
" stat = \"select count(*) from Level2BytesKLine.dbo.KLine where TRADE_DT='20220608'\"\n", " stat = \"select count(*) from Level2BytesTick.dbo.Tick where S_INFO_WINDCODE='002182.SZ'\"\n",
" rs = conn.execute(stat)\n", " rs = conn.execute(stat)\n",
" for row in rs.fetchall():\n", " for row in rs.fetchall():\n",
" print(row)" " print(row)"

@ -13,6 +13,6 @@ class DDBBase(object):
def __init__(self): def __init__(self):
self.ddb_sess = ddb.session(self.ddb_config['host'], 8848) self.ddb_sess = ddb.session(self.ddb_config['host'], 8848)
self.login(self.ddb_config['username'], self.ddb_config['password']) self.ddb_sess.login(self.ddb_config['username'], self.ddb_config['password'])

@ -1,950 +0,0 @@
import importlib
import gzip
import pickle
import functools
import abc
import warnings
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
#from tqdm.contrib.concurrent import process_map
from multiprocessing import Pool
import numpy as np
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
import ProtoBuffEntitys
from DDBBase import DDBBase
class DDBLoader(DDBBase):
"""
- 放了几个公用的配置字段包括
1. SQL-Server的链接参数
2. DolphinDB的链接参数
- 放了几个@abstractmethod在里面不过如果不需要使用多态特性那应该用处不大
1. create_ddb_database
2. create_ddb_partition_table
"""
mssql_config = {
'host' : '192.168.1.7',
'username' : 'sa',
'password' : 'passw0rd!'
}
def __init__(self):
super().__init__()
self.mssql_engine = sa.create_engine(
"mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
connect_args = {
"TrustServerCertificate": "yes"
}, echo=False
)
@abc.abstractmethod
def create_ddb_database(self, *args, **kwargs):
"""
创建database函数需要被子类具体实现
"""
return
@abc.abstractmethod
def create_ddb_partition_table(self, *args, **kwargs):
"""
创建分区表函数需要被子类具体实现
"""
return
@staticmethod
def tscode_to_windcode(series):
return series.apply(lambda x : x[2:] + '.' + x[:2])
@staticmethod
def make_symbol(series):
return series.astype('int32').astype('str')\
.apply(str.zfill, args=(6,))\
.apply(lambda code : \
code + '.SH' if code[0] == '6' \
else code + '.SZ')
@staticmethod
def make_date(series):
# 特别是对于分红表如果某些关键日期还未公布则会填充0导致日期解析失败
series.loc[series == 0] = np.nan
return pd.to_datetime(
series.astype(str), format='%Y%m%d')
@staticmethod
def make_nparray(series):
return series.apply(lambda x : np.array(x))
@staticmethod
def make_time(series):
s_hr = series // 10000000 * 3600000
s_min = series % 10000000 // 100000 * 60000
s_sec = series % 100000 // 1000
s_ms = series % 1000
return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms')
class DDBPITLoader(DDBLoader):
ddb_path = "dfs://pit_stock_ts"
ddb_dbname = "ddb_pit_stock_ts"
num_code_partition = 50
table_name_mapping = {
#'CBS_AFTER_ADJ' : 'bs_common_adj',
#'CBS_BEFORE_ADJ' : 'bs_common_ori',
#'CCFS_AFTER_ADJ' : 'cfs_common_adj',
#'CCFS_BEFORE_ADJ' : 'cfs_common_ori',
#'CIS_AFTER_ADJ' : 'is_common_adj',
#'CIS_BEFORE_ADJ' : 'is_common_ori',
'DIV_WIND' : 'divident',
#'EP_WIND' : 'earnings_preannouncement',
#'PEE_WIND' : 'preliminary_earnings_estimate'
}
meta_col_config = {
'WIND_CODE' : ('code', 'SYMBOL'),
# mssql表中不需要记录的meta字段在这里直接设置为None
'IntCode' : None,
'ACTUAL_ANN_DT' : None,
'ReportPeriod' : ('report_period', 'DATE'),
'AppearInPeriod' : ('appear_in_period', 'DATE'),
'AppearAtDate' : ('appear_at_date', 'DATE')
}
date_col_set = {
'report_period',
'appear_in_period',
'appear_at_date',
'ReportPeriod',
'AppearInPeriod',
'AppearAtDate',
'EQY_RECORD_DT',
'EX_DT',
'DVD_PAYOUT_DT',
'S_DIV_PRELANDATE',
'S_DIV_SMTGDATE',
'DVD_ANN_DT',
'S_DIV_PREANNDT'
}
ddb_type_mapping = {
'float' : 'DOUBLE',
'int' : 'INT',
'text' : 'STRING',
'varchar' : 'STRING',
'str' : 'STRING'
}
# 基本面数据库现在存放在91服务器之上
mssql_config = {
'host' : '192.168.1.91',
'username' : 'sa',
'password' : 'xn.123',
'dbname' : 'tr_statement'
}
def __init__(self):
super().__init__()
# 重新设定mssql_engine对象此时我们需要使用基本面数据库
self.mssql_engine = sa.create_engine(
"mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
connect_args = {
"TrustServerCertificate": "yes"
}, echo=False
)
def create_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(
directory = '{dbPath}',
partitionType = HASH,
partitionScheme = [SYMBOL, {num_code_partition}],
engine = 'TSDB'
)
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path,
num_code_partition = self.num_code_partition
))
def _make_col_config(self, mssql_table_name):
"""
Return:
mssql_col_name_list, ddb_col_name_list, ddb_col_type_list
"""
with self.mssql_engine.connect() as conn:
col_sp_list = list(conn.execute(f"exec sp_columns {mssql_table_name}").fetchall())
mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \
[], [], []
for col_sp in col_sp_list:
_col_name = col_sp[3]
_col_type = col_sp[5]
# 对于meta字段需要根据meta配置表来进行处理
if _col_name in self.meta_col_config:
# 跳过mssql表中 不需要记录的meta字段
if self.meta_col_config[_col_name] is None:
continue
# 字段名和字段类型都要进行映射
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(self.meta_col_config[_col_name][0])
ddb_col_type_list.append(self.meta_col_config[_col_name][1])
# 对于非meta字段仅需要检查是否是float类型对于float类型设置类型为DOUBLE
else:
# 需要之后被转换成DATE的字段一般在原表中为为INT类型
if _col_name in self.date_col_set:
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(_col_name)
ddb_col_type_list.append('DATE')
# 按照对照表进行类型转换
elif _col_type in self.ddb_type_mapping:
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(_col_name)
ddb_col_type_list.append(self.ddb_type_mapping[_col_type])
# 对照表中没有的字段类型,就不加入到字段列表中了
else:
print(f"!**Unrecognized type '{_col_type}' for column {_col_name}, will skip.")
return mssql_col_name_list, ddb_col_name_list, ddb_col_type_list
def create_ddb_partition_table(self, mssql_table_name):
"""创建分区表"""
memory_table_name = mssql_table_name
partition_table_name = self.table_name_mapping[mssql_table_name]
mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \
self._make_col_config(mssql_table_name)
# 根据是否
if 'appear_in_period' in ddb_col_name_list:
compress_methods = """{
'report_period' : 'delta',
'appear_in_period' : 'delta',
'appear_at_date' : 'delta'
}"""
else:
compress_methods = """{
'report_period' : 'delta',
'appear_at_date' : 'delta'
}"""
# 因为已经根据`appear_in_period`分列了调整前和调整后,因此不需要对它再进行排序了
sort_columns = "`code`report_period`appear_at_date"
# 1. 先创建内存表,内存表中设定好列名和列类型
# 2. 然后根据内存表创建分区表,设定分区列等信息
self.ddb_sess.run("""
{memory_table_name} = table(
{capacity}:0,
{column_name_list},
[{column_type_list}]
);
if (existsTable("{ddb_path}", "{partition_table_name}")) {{
dropTable({ddb_dbname}, "{partition_table_name}");
}}
{partition_table_name} = createPartitionedTable(
dbHandle = {ddb_dbname},
table = {memory_table_name},
tableName = "{partition_table_name}",
partitionColumns = 'code',
compressMethods = {compress_methods},
sortColumns = {sort_columns}
);
""".format(
ddb_dbname = self.ddb_dbname,
ddb_path = self.ddb_path,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name,
capacity = 10,
column_name_list = '`' + '`'.join(ddb_col_name_list),
column_type_list = ','.join(ddb_col_type_list),
compress_methods = compress_methods.replace('\n', '').replace(' ', ''),
sort_columns = sort_columns
))
print('-' * 80)
print(f"Did create parition table <{partition_table_name}>:")
pprint(self.ddb_sess.run(f"schema({partition_table_name});"))
return partition_table_name, mssql_col_name_list
def create_ddb_partition_tables(self):
for mssql_table_name in self.table_name_mapping:
self.create_ddb_partition_table(mssql_table_name)
def _dump_pit_to_ddb(self, mssql_table_name):
print('Will work on table', mssql_table_name)
# 返回的`mssql_col_name_list`可以用来对SQL-Server获取的dataframe进行列过滤
partition_table_name, mssql_col_name_list = \
self.create_ddb_partition_table(mssql_table_name)
with self.mssql_engine.connect() as conn:
stat = f"select distinct [WIND_CODE] from {mssql_table_name}"
stock_id_list = list(conn.execute(stat).fetchall())
with tqdm(stock_id_list) as pbar:
for (stock_id,) in pbar:
pbar.set_description(f"Will work on {stock_id}")
#pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server")
stat = """
select * from {mssql_table_name}
where WIND_CODE='{stock_id}' and AppearAtDate>0
""".format(
mssql_table_name = mssql_table_name,
stock_id = stock_id
)
row_list = list(conn.execute(stat).fetchall())
num_rows = len(row_list)
# 因为对AppearAtDate做了过滤所以有可能得到一个空的数据集
if num_rows == 0:
print(f"Will skip {stock_id} due to empty result.")
continue
#pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}")
# 这里需要对select语句获取的所有列进行一次过滤以保证和partition table中的列一致
df = pd.DataFrame(row_list)[mssql_col_name_list]
# 需要把部分字段的int字段类型转换成DATE字段类型
for df_col in df.columns:
if df_col in self.date_col_set:
df[df_col] = DDBLoader.make_date(df[df_col])
# 因为在做数据库View的时候已经做过一轮转换了所以这里就不需要再次转换了
#df['WIND_CODE'] = DDBLoader.tscode_to_windcode(df['WIND_CODE'])
self.ddb_sess.upload({mssql_table_name : df})
self.ddb_sess.run(f"{partition_table_name}.tableInsert({mssql_table_name})")
def dump_pit_to_ddb(self):
for mssql_table_name in self.table_name_mapping:
self._dump_pit_to_ddb(mssql_table_name)
class DDBDailyLoader(DDBLoader):
ddb_path = "dfs://daily_stock_ts"
ddb_dbname = "db_daily_stock_ts"
daily_kline_cols = [
'code', 'm_nDate',
'open', 'high', 'low', 'close', 'vol',
'amount', 'cjbs', 'yclose',
'PctChg', 'IsZt', 'IsDt', 'IsST', 'IsGoDelist',
'FloatShares', 'MarketValues',
'factor'
]
daily_kline_col_types = [
'SYMBOL', 'DATE',
'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE',
'DOUBLE', 'INT', 'DOUBLE',
'DOUBLE', 'INT', 'INT', 'INT', 'INT',
'DOUBLE', 'DOUBLE',
'DOUBLE'
]
def create_ddb_database(self):
# TODO: daily数据库已经在DDBDailyFactor中被创建了
# 后续可以迁移过来,不过现在就暂时先不管了
pass
def load_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(directory='{dbPath}')
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
print('Did load database from', self.ddb_path)
def create_ddb_partition_table(self, memory_table_name, partition_table_name):
# TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来
# 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可
# 搬迁数据的时候需要考虑按照逐个股票来搬迁,以免造成对内存的巨大压力
self.ddb_sess.run("""
// 确保删除原表
if (existsTable("{ddb_daily_path}", "{partition_table_name}")) {{
dropTable({ddb_daily_dbname}, "{partition_table_name}");
}}
// 然后根据内存表的结构创建持久化的分区表
{partition_table_name} = {ddb_daily_dbname}.createPartitionedTable(
table = {memory_table_name},
tableName = "{partition_table_name}",
partitionColumns = `code,
sortColumns = `code`m_nDate,
compressMethods = {{m_nDate:"delta"}}
);
""".format(
ddb_daily_path = self.ddb_path,
ddb_daily_dbname = self.ddb_dbname,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name,
))
def create_ddb_memory_table(self, memory_table_name, capacity):
self.ddb_sess.run("""
// 先创建一个空的内存表用来表征结构如果无需插入数据capacity可以设为10
{memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
memory_table_name = memory_table_name,
capacity = capacity,
col_names = '`' + '`'.join(self.daily_kline_cols),
col_types = ', '.join(self.daily_kline_col_types)
))
def dump_daily_kline_to_ddb(self):
# 先创建一个分区表,然后再逐个股票的数据插入
# 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦
# 2. python程序中的dataframe直接上传到dolphindb内存表不需要考虑内存表字段类型分区表中设置好即可
memory_table_name = 'daily_kline_mem'
partition_table_name = 'daily_kline'
self.create_ddb_memory_table(memory_table_name, 10)
print('Did create ddb memory table.')
pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
self.create_ddb_partition_table(memory_table_name, partition_table_name)
print('Did create ddb partition table.')
pprint(self.ddb_sess.run(f"schema({partition_table_name})"))
with self.mssql_engine.connect() as conn:
stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]"
stock_id_list = list(conn.execute(stat).fetchall())
with tqdm(stock_id_list) as pbar:
for (stock_id,) in pbar:
pbar.set_description(f"Will work on {stock_id}")
#pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server")
stat = """
select * from [StockDaily].dbo.[DailyKLine]
where StockID='{stock_id}'
""".format(
stock_id = stock_id
)
row_list = list(conn.execute(stat).fetchall())
num_rows = len(row_list)
#pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}")
df = pd.DataFrame(row_list)
df['date'] = DDBLoader.make_date(df['date'])
df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID'])
self.ddb_sess.upload({memory_table_name : df})
#print('Did upload dataframe to ddb.')
#pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
#break
self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})")
class DDBHFTLoader(DDBLoader):
"""
0. 从sql-server中读取calendar数据并创建成员变量df_calendardf_calendar可以保存在本地pickle作为缓存
|- `def make_calendar_df(self) -> df_calendar`
1. 创建ddb中的数据库分区性质从calendar数据中获取
|- `def create_ddb_database(self, df_calendar) -> void`
|- `def load_ddb_database(self) -> void`
2. 在ddb数据库中创建calendar表
|- `def create_ddb_calendar(self, df_calendar) -> void`
3. 创建ddb的分布式表结构
|- `create_ddb_partition_table(self, hft_type_name)`
|- `_make_table_skeleton(self, hft_type_name, capacity) -> memory_table_name`
4. 从sql server的高频数据转录到dolpindb数据库中
|- `dump_hft_to_ddb(self, type_name, stock_id, trade_date=None)`
"""
hft_type_list = ['KLine', 'Order', 'Tick', 'TickQueue', 'Transe']
protobuff_name_dict = {
name : f"{name}Message_pb2" for name in hft_type_list
}
protobuff_module_dict = {
type_name : importlib.import_module(f".{module_name}", package='ProtoBuffEntitys')
for type_name, module_name in protobuff_name_dict.items()
}
protobuff_desc_dict = {
type_name : eval(f"ProtoBuffEntitys.{module_name}.{type_name}Array.{type_name}Data.DESCRIPTOR")
for type_name, module_name in protobuff_name_dict.items()
}
mssql_name_dict = {
type_name : (
f"{type_name}" if type_name != 'TickQueue' \
else f"TickQue"
) for type_name in hft_type_list
}
# 数据库路径和数据库名可以不一致
ddb_path = "dfs://hft_stock_ts"
ddb_dbname = "db_stock_ts"
ddb_memory_table_suffix = "Memroy"
ddb_partition_table_suffix = "Partitioned"
# calendar表不需要分区因此需要创建一个新的数据库
# 该数据库可以是一个简单的csv现在还不清楚两者的差别
#ddb_calendar_path = "dfs://daily_calendar"
#ddb_calendar_dbname = "db_calendar"
ddb_calendar_table_name = "Calendar"
col_type_mapping = {
'code' : 'SYMBOL',
'm_nDate' : 'DATE',
'm_nTime' : 'TIME',
1 : 'FLOAT',
3 : 'INT',
5 : 'INT',
13 : 'INT',
}
# this value may be used by factor makers, which may loop through code partitions
num_code_partition = 50
num_workers = 8
default_table_capacity = 10000
ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv'
def init_ddb_database(self, df_calendar):
"""
1. 创建ddb_database
2. 创建calendar表
3. 创建数据分区表
"""
# df_calendar还是由外部输入比较方便
#df_calendar = self.make_calendar_df()
self.create_ddb_database(df_calendar)
self.create_ddb_calendar(df_calendar)
for hft_type_name in self.hft_type_list:
self.create_ddb_partition_table(hft_type_name)
def init_ddb_table_data(self, df_calendar, num_workers=None):
"""
对每个股票进行循环转录数据到分区表
"""
stock_list = df_calendar['code'].unique().astype('str')
# 不能重复创建Pool对象因此需要在循环的最外侧创建好Pool对象然后传参进去
with Pool(self.num_workers if num_workers is None else num_workers) as pool:
for hft_type_name in self.hft_type_list:
print('Will work on hft type:', hft_type_name)
with tqdm(stock_list) as pbar:
for stock_id in pbar:
pbar.set_description(f"Working on stock {stock_id}")
self.dump_hft_to_ddb(hft_type_name, stock_id, pbar=pbar, pool=pool)
def _get_stock_date_list(self, cache=False):
"""
Deprecated: This function is replaced by `create_ddb_calendar()`.
"""
if cache:
with open('tmp.pkl', 'rb') as fin:
stock_list, date_list = pickle.load(fin)
else:
with self.mssql_engine.connect() as conn:
# 从KLine表查询主要是因为KLine表最小
stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine"
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
stock_list, date_list = zip(*stock_date_list)
# cache
#with open('tmp.pkl', 'wb') as fout:
# pickle.dump((stock_list, date_list), fout)
return pd.Series(stock_list, dtype='str').unique(), \
pd.Series(date_list, dtype='datetime64[D]').unique()
def create_ddb_database(self, pd_calendar):
# 从`pd_calendar`中创建`stock_list`和`date_list`
stock_list = pd_calendar['code'].unique().astype('str')
date_list = pd_calendar['m_nDate'].unique().astype('datetime64[D]')
# 可以把所有股票高频数据放在一个数据库中不同的表
# 分区策略是跟数据库绑定的,因此需要保证同一个数据库中的表都使用同样的分区额策略
# 对于股票高频数据我们可以使用COMPO的分区策略并且两个子db的分区策略都是VALUE类型的code和m_nDate字段
if self.ddb_sess.existsDatabase(self.ddb_path):
print('Wiil drop database:', self.ddb_path)
self.ddb_sess.dropDatabase(self.ddb_path)
# 要创建一个COMPO分区的数据库需要首先创建两个简单分区的子数据库
# 这里我们使用先按日期,然后按股票分区的子数据库
# Please note that when creating a DFS database with COMPO domain,
# the parameter dbPath for each partition level must be either an empty string or unspecified.
db_date = self.ddb_sess.database('db_date', partitionType=keys.VALUE, partitions=date_list, dbPath='')
# 这里看起来直接使用dolphindb的脚本语句更方便一些
self.ddb_sess.run("""
db_stock = database("", 5, [SYMBOL, {num_code_partition}])
""".format(
num_code_partition = self.num_code_parition
))
#self.ddb_sess.run("""
# db_stock = database("", 1, symbol({partitions}))
#""".format(
# partitions = '`' + '`'.join(stock_list)
#))
self.ddb_sess.run("""
{dbName} = database(
directory = '{dbPath}',
partitionType = COMPO,
partitionScheme = [db_date, db_stock],
engine = "TSDB")
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
self._load_ddb_dump_journal(recreate=True)
def load_ddb_database(self):
db_date = self.ddb_sess.database('db_date', dbPath='')
db_stock = self.ddb_sess.database('db_stock', dbPath='')
self.ddb_sess.run("{dbName} = database(directory='{dbPath}')".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
self._load_ddb_dump_journal()
def _load_ddb_dump_journal(self, recreate=False):
if recreate or not Path(self.ddb_dump_journal_fname).exists():
print('Will create new dump journal.')
self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'w')
self.dump_journal_writer.write("type_name,stock_id,status\n")
self.dump_journal_writer.flush()
else:
print('Will load previous dump journal.')
self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'a')
self.dump_journal_df = pd.read_csv(self.ddb_dump_journal_fname)
self.dump_journal_df.set_index(['type_name', 'stock_id', 'status'], inplace=True)
# 因为dump_journal_df只会在创建的时候载入一次数据之后不会在写入因此可以在此时对index进行排序
self.dump_journal_df.sort_index(inplace=True)
print('Did load the dump journal, shape', self.dump_journal_df.shape)
#pprint(self.dump_journal_df.head())
def create_ddb_calendar(self, df_calendar):
mem_table = self.ddb_calendar_table_name + self.ddb_memory_table_suffix
per_table = self.ddb_calendar_table_name
# 1. 创建临时内存表
# calendar的行数大概是股票数量 * 交易日数量
self.ddb_sess.run("""
{table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
table_name = mem_table,
capacity = 5000 * 1000,
col_names = '`code`m_nDate',
col_types = "SYMBOL, DATE"
))
print('Did create the memory table')
# 2. 向内存表中插入所有(code, date)数据
appender = ddb.tableAppender(tableName=mem_table, ddbSession=self.ddb_sess)
num = appender.append(df_calendar)
print('Did append calendar data into ddb memory table, return code', num)
# 3. 创建持久化表格之前需要先根据路径创建一个database对象
# 但研究了一下发现好像一个database里面可以同时存在分区表和非分区表
# 所以在这里暂时就不创建新的database了
# 但因为原database设置成了TSDB所以必须在createTable的时候指定sortKey
#self.ddb_sess.run("""
# {db_name} =
#""")
# 4. 直接从内存表创建一个持久化表格
if self.ddb_sess.existsTable(self.ddb_path, per_table):
self.ddb_sess.dropTable(self.ddb_path, per_table)
self.ddb_sess.run("""
tableInsert(createTable(
dbHandle={ddb_dbname},
table={mem_table},
tableName=`{per_table},
sortColumns=`code`m_nDate,
compressMethods={{"m_nDate":"delta"}}
), {mem_table})
""".format(
ddb_dbname = self.ddb_dbname,
mem_table = mem_table,
per_table = per_table
))
print('Did create the persistent table with the memory table')
def make_calendar_df(self):
print('Will create calendar dataframe from SQL Server')
# 从KLine表查询主要是因为KLine表最小
with self.mssql_engine.connect() as conn:
stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine"
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
df_calendar['m_nDate'] = make_date(df_calendar['m_nDate'])
print('Did make the DataFrame for calendar')
return df_calendar
def _make_table_skeleton(self, hft_type_name, table_capacity=default_table_capacity):
def _make_tbl_config(field_list):
"""
根据ProtoBuffEntity对象的Descriptor.fields创建ddb标准的列名列表和列类型列表
"""
col_name_list, col_type_list = [], []
for desc in field_list:
col_name_list.append(desc.name)
# 如果对列明有特殊设定,目前仅包括`code`m_nDate和`m_nTime三个字段
if desc.name in self.col_type_mapping:
col_type_list.append(self.col_type_mapping[desc.name])
# 通过对ProtoBuffEntity的类型编号映射到ddb的类型编号
# 如果默认值是一个数组那么ddb类型要额外增加说明是数组
# ProtoBuffEntity的类型编号只针对基本类型数组需要通过`default_value`来判断
else:
col_type = self.col_type_mapping[desc.type]
if isinstance(desc.default_value, list):
col_type += '[]'
col_type_list.append(col_type)
return col_name_list, col_type_list
desc_obj = self.protobuff_desc_dict[hft_type_name]
col_name_list, col_type_list = _make_tbl_config(desc_obj.fields)
table_name = hft_type_name + self.ddb_memory_table_suffix
print('-' * 80)
print('Will create table structure:', table_name)
self.ddb_sess.run("""
{table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
table_name = table_name,
capacity = table_capacity,
col_names = '`' + '`'.join(col_name_list),
col_types = ', '.join([f"'{type_name}'" for type_name in col_type_list])
))
res = self.ddb_sess.run(f"schema({table_name}).colDefs")
pprint(res)
print('-' * 80)
return table_name
def create_ddb_partition_table(self, hft_type_name):
memory_table_name = self._make_table_skeleton(hft_type_name, 10)
partition_table_name = hft_type_name + self.ddb_partition_table_suffix
print('-' * 80)
print('Will create partitioned table:', partition_table_name)
self.ddb_sess.run("""
{ddb_dbname}.createPartitionedTable(
table = {memory_table_name},
tableName = `{partition_table_name},
partitionColumns = `m_nDate`code,
sortColumns = `code`m_nDate`m_nTime,
compressMethods = {{m_nDate:"delta", m_nTime:"delta"}}
)
""".format(
ddb_dbname = self.ddb_dbname,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name
))
res = self.ddb_sess.run(f"schema(loadTable('{self.ddb_path}', '{partition_table_name}')).colDefs")
pprint(res)
print('-' * 80)
def dump_hft_to_ddb(self, type_name, stock_id, trade_date=None, pbar=None, pool=None):
if (type_name, stock_id, 'OK') in self.dump_journal_df.index:
message = f"Wiil skip ({type_name}, {stock_id}) as it appears in the dump journal."
if pbar is None:
print(message)
else:
pbar.set_description(message)
return
self.dump_journal_writer.write(f"{type_name},{stock_id},START\n")
self.dump_journal_writer.flush()
# 经过尝试按个股来做batch查询效率还是可以接受的
# mssql中索引字段是(S_INFO_WINDCODE, TRADE_DT)
with self.mssql_engine.connect() as conn:
stat = """
select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}]
where S_INFO_WINDCODE='{stock_id}'
""".format(
mssql_type_name = self.mssql_name_dict[type_name],
stock_id = stock_id
)
row_list = list(conn.execute(stat).fetchall())
num_rows = len(row_list)
if pbar:
#pbar.set_description(f"Did get the result set for stock {stock_id} from mssql")
pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}")
else:
print(f"Did get the result set for stock {stock_id} from mssql")
# 每一行是当个个股某一日的所有高频交易信息
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None:
print("Will create new Pool object, but this is not encourage for large batch work.")
pool = Pool(self.num_worker)
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm(total=num_rows, leave=False) as sub_pbar:
for _ in pool.imap_unordered(
functools.partial(
dump_stock_daily_to_ddb,
type_name = type_name,
stock_id = stock_id
),
row_list
):
sub_pbar.update()
self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n")
self.dump_journal_writer.flush()
@staticmethod
def make_stock_daily_df(blob, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
blob = gzip.decompress(blob)
dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()")
dataArray.ParseFromString(blob)
data_dict_list = [
{field.name : val for field, val in entry.ListFields()}
for entry in dataArray.dataArray
]
array_type_list = [
field.name
for field, val in dataArray.dataArray[0].ListFields()
if isinstance(field.default_value, list)
]
#pprint(array_type_list)
df = pd.DataFrame(data_dict_list)
#df['code'] = make_symbol(df['code'])
df['code'] = stock_id
df['m_nDate'] = make_date(df['m_nDate'])
df['m_nTime'] = df['m_nDate'] + make_time(df['m_nTime'])
for field_name in array_type_list:
df[field_name] = make_nparray(df[field_name])
#print(f"Did create ddb table for dataframe of shape {df.shape}")
# self.make_table_skeleton(type_name, df.shape[0])
return df
@staticmethod
def dump_stock_daily_to_ddb(row, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
df_table_name = type_name
df = make_stock_daily_df(row[2], type_name, stock_id)
ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848)
ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password'])
ddb_sess.upload({df_table_name : df})
# 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
#ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
dbPath = DDBLoader.ddb_path,
partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix,
df_table_name = df_table_name
))
def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# PIT基本面数据
loader = DDBPITLoader()
loader.create_ddb_database()
#loader.create_ddb_partition_tables()
loader.dump_pit_to_ddb()
# 日频行情数据
#loader = DDBDailyLoader()
#loader.load_ddb_database()
#loader.dump_daily_kline_to_ddb()
# 高频数据
#df_calendar = loader.make_calendar_df()
#loader.init_ddb_database(df_calendar)
#print('Did finish init_ddb_database')
#loader.load_ddb_database()
#print('Did load ddb database')
#loader.init_ddb_table_data(df_calendar)
#print('Did finish init_table_data')
if __name__ == '__main__':
main()

@ -0,0 +1,3 @@
from DDBBase import DDBBase
__all__ = ['DDBBase']

@ -0,0 +1,159 @@
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
#from tqdm.contrib.concurrent import process_map
import numpy as np
import pandas as pd
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
from .DDBLoader import DDBLoader
class DDBBasicInfoLoader(DDBLoader):
ddb_path = "dfs://info_stock_ts"
ddb_dbname = "ddb_info_stock_ts"
# 这些映射表似乎没有什么用,每张表手动做一下表结构和插入动作就可以了
table_name_mapping = {
'IndustryInfo' : 'info_industry',
'ListAndDelist' : 'info_list_delist'
}
date_col_set = {
'EnterDate',
'ExitDate'
'ListDate',
'DelistDate'
}
ddb_type_mapping = {
'float' : 'DOUBLE',
'int' : 'INT',
'text' : 'STRING',
'varchar' : 'STRING',
'str' : 'STRING'
}
# 基本面数据库现在存放在91服务器之上
mssql_config = {
'host' : '192.168.1.7',
'username' : 'sa',
'password' : 'passw0rd!',
'dbname' : 'StockInfo'
}
def __init__(self):
super().__init__()
# 重新设定mssql_engine对象此时我们需要使用基本面数据库
self.mssql_engine = sa.create_engine(
"mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
connect_args = {
"TrustServerCertificate": "yes"
}, echo=False
)
def create_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(
directory = '{dbPath}',
partitionType = HASH,
partitionScheme = [SYMBOL, 10],
engine = 'TSDB'
)
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path,
))
def _create_info_industry_table(self):
with self.mssql_engine.connect() as conn:
stat = "select * from IndustryInfo"
row_list = list(conn.execute(stat).fetchall())
capacity = len(row_list)
df = pd.DataFrame(row_list)
df['StockID'] = self.tscode_to_windcode(df['StockID'])
df['EnterDate'] = self.make_date(df['EnterDate'])
df['ExitDate'] = self.make_date(df['ExitDate'])
self.ddb_sess.run(f"""
tbl = table(
{capacity}:0,
`code`industry_code`industry_name`industry_level`enter_date`exit_date`is_new`industry_class,
[SYMBOL, SYMBOL, SYMBOL, INT, DATE, DATE, INT, SYMBOL]
);
""")
self.ddb_sess.run("tableInsert{tbl}", df)
self.ddb_sess.run(f"""
dropTable({self.ddb_dbname}, "info_industry");
info_industry = createPartitionedTable(
dbHandle = {self.ddb_dbname},
table = tbl,
tableName = "info_industry",
partitionColumns='code',
compressMethods = {{'enter_date' : 'delta', 'exit_date' : 'delta'}},
sortColumns = ['code', 'enter_date', 'exit_date']
);
info_industry.tableInsert(tbl);
""")
def _create_info_list_delist_table(self):
with self.mssql_engine.connect() as conn:
stat = "select * from ListAndDelist"
row_list = list(conn.execute(stat).fetchall())
capacity = len(row_list)
df = pd.DataFrame(row_list)
df['StockID'] = self.tscode_to_windcode(df['StockID'])
df['ListDate'] = self.make_date(df['ListDate'])
df['DelistDate'] = self.make_date(df['DelistDate'])
self.ddb_sess.run(f"""
tbl = table(
{capacity}:0,
`code`list_date`delist_date,
[SYMBOL, DATE, DATE]
);
""")
self.ddb_sess.run("tableInsert{tbl}", df)
self.ddb_sess.run(f"""
dropTable({self.ddb_dbname}, "info_list_delist");
info_list_delist = createPartitionedTable(
dbHandle = {self.ddb_dbname},
table = tbl,
tableName = "info_list_delist",
partitionColumns='code',
compressMethods = {{'list_date' : 'delta', 'delist_date' : 'delta'}},
sortColumns = ['code', 'list_date', 'delist_date']
);
info_list_delist.tableInsert(tbl);
""")
def create_ddb_partition_table(self):
"""创建分区表"""
self._create_info_industry_table()
self._create_info_list_delist_table()

@ -0,0 +1,160 @@
import pickle
import functools
import abc
import warnings
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
#from tqdm.contrib.concurrent import process_map
from multiprocessing import Pool
import numpy as np
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
import ProtoBuffEntitys
class DDBDailyLoader(DDBLoader):
ddb_path = "dfs://daily_stock_ts"
ddb_dbname = "db_daily_stock_ts"
daily_kline_cols = [
'code', 'm_nDate',
'open', 'high', 'low', 'close', 'vol',
'amount', 'cjbs', 'yclose',
'PctChg', 'IsZt', 'IsDt', 'IsST', 'IsGoDelist',
'FloatShares', 'MarketValues',
'factor'
]
daily_kline_col_types = [
'SYMBOL', 'DATE',
'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE',
'DOUBLE', 'INT', 'DOUBLE',
'DOUBLE', 'INT', 'INT', 'INT', 'INT',
'DOUBLE', 'DOUBLE',
'DOUBLE'
]
def create_ddb_database(self):
# TODO: daily数据库已经在DDBDailyFactor中被创建了
# 后续可以迁移过来,不过现在就暂时先不管了
pass
def load_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(directory='{dbPath}')
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
print('Did load database from', self.ddb_path)
def create_ddb_partition_table(self, memory_table_name, partition_table_name):
# TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来
# 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可
# 搬迁数据的时候需要考虑按照逐个股票来搬迁,以免造成对内存的巨大压力
self.ddb_sess.run("""
// 确保删除原表
if (existsTable("{ddb_daily_path}", "{partition_table_name}")) {{
dropTable({ddb_daily_dbname}, "{partition_table_name}");
}}
// 然后根据内存表的结构创建持久化的分区表
{partition_table_name} = {ddb_daily_dbname}.createPartitionedTable(
table = {memory_table_name},
tableName = "{partition_table_name}",
partitionColumns = `code,
sortColumns = `code`m_nDate,
compressMethods = {{m_nDate:"delta"}}
);
""".format(
ddb_daily_path = self.ddb_path,
ddb_daily_dbname = self.ddb_dbname,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name,
))
def create_ddb_memory_table(self, memory_table_name, capacity):
self.ddb_sess.run("""
// 先创建一个空的内存表用来表征结构如果无需插入数据capacity可以设为10
{memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
memory_table_name = memory_table_name,
capacity = capacity,
col_names = '`' + '`'.join(self.daily_kline_cols),
col_types = ', '.join(self.daily_kline_col_types)
))
def dump_daily_kline_to_ddb(self):
# 先创建一个分区表,然后再逐个股票的数据插入
# 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦
# 2. python程序中的dataframe直接上传到dolphindb内存表不需要考虑内存表字段类型分区表中设置好即可
memory_table_name = 'daily_kline_mem'
partition_table_name = 'daily_kline'
self.create_ddb_memory_table(memory_table_name, 10)
print('Did create ddb memory table.')
pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
self.create_ddb_partition_table(memory_table_name, partition_table_name)
print('Did create ddb partition table.')
pprint(self.ddb_sess.run(f"schema({partition_table_name})"))
with self.mssql_engine.connect() as conn:
stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]"
stock_id_list = list(conn.execute(stat).fetchall())
with tqdm(stock_id_list) as pbar:
for (stock_id,) in pbar:
pbar.set_description(f"Will work on {stock_id}")
#pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server")
stat = """
select * from [StockDaily].dbo.[DailyKLine]
where StockID='{stock_id}'
""".format(
stock_id = stock_id
)
row_list = list(conn.execute(stat).fetchall())
num_rows = len(row_list)
#pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}")
df = pd.DataFrame(row_list)
df['date'] = DDBLoader.make_date(df['date'])
df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID'])
self.ddb_sess.upload({memory_table_name : df})
#print('Did upload dataframe to ddb.')
#pprint(self.ddb_sess.run(f"schema({memory_table_name})"))
#break
self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})")
def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# 日频行情数据
loader = DDBDailyLoader()
loader.load_ddb_database()
#loader.dump_daily_kline_to_ddb()
if __name__ == '__main__':
main()

@ -0,0 +1,484 @@
import importlib
import gzip
import pickle
import functools
import abc
import warnings
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
#from tqdm.contrib.concurrent import process_map
from multiprocessing import Pool
import sqlalchemy as sa
import numpy as np
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
import dolphindb as ddb
import dolphindb.settings as keys
from .ProtoBuffEntitys import KLineMessage_pb2, OrderMessage_pb2, TickMessage_pb2, TickQueueMessage_pb2, TranseMessage_pb2
from .DDBLoader import DDBLoader
class DDBHFTLoader(DDBLoader):
"""
0. 从sql-server中读取calendar数据并创建成员变量df_calendardf_calendar可以保存在本地pickle作为缓存
|- `def make_calendar_df(self) -> df_calendar`
1. 创建ddb中的数据库分区性质从calendar数据中获取
|- `def create_ddb_database(self, df_calendar) -> void`
|- `def load_ddb_database(self) -> void`
2. 在ddb数据库中创建calendar表
|- `def create_ddb_calendar(self, df_calendar) -> void`
3. 创建ddb的分布式表结构
|- `create_ddb_partition_table(self, hft_type_name)`
|- `_make_table_skeleton(self, hft_type_name, capacity) -> memory_table_name`
4. 从sql server的高频数据转录到dolpindb数据库中
|- `dump_hft_to_ddb(self, type_name, stock_id, trade_date=None)`
"""
hft_type_list = ['KLine', 'Order', 'Tick', 'TickQueue', 'Transe']
protobuff_name_dict = {
name : f"{name}Message_pb2" for name in hft_type_list
}
protobuff_module_dict = {
type_name : importlib.import_module(f".{module_name}", package='loader.ProtoBuffEntitys')
for type_name, module_name in protobuff_name_dict.items()
}
protobuff_desc_dict = {
type_name : eval(f"{module_name}.{type_name}Array.{type_name}Data.DESCRIPTOR")
for type_name, module_name in protobuff_name_dict.items()
}
mssql_name_dict = {
type_name : (
f"{type_name}" if type_name != 'TickQueue' \
else f"TickQue"
) for type_name in hft_type_list
}
# 数据库路径和数据库名可以不一致
ddb_path = "dfs://hft_stock_ts"
ddb_dbname = "db_stock_ts"
ddb_memory_table_suffix = "Memroy"
ddb_partition_table_suffix = "Partitioned"
# calendar表不需要分区因此需要创建一个新的数据库
# 该数据库可以是一个简单的csv现在还不清楚两者的差别
#ddb_calendar_path = "dfs://daily_calendar"
#ddb_calendar_dbname = "db_calendar"
ddb_calendar_table_name = "Calendar"
col_type_mapping = {
'code' : 'SYMBOL',
'm_nDate' : 'DATE',
'm_nTime' : 'TIME',
1 : 'FLOAT',
3 : 'INT',
5 : 'INT',
13 : 'INT',
}
# this value may be used by factor makers, which may loop through code partitions
num_code_partition = 50
num_workers = 8
default_table_capacity = 10000
ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv'
def init_ddb_database(self, df_calendar):
"""
1. 创建ddb_database
2. 创建calendar表
3. 创建数据分区表
"""
# df_calendar还是由外部输入比较方便
#df_calendar = self.make_calendar_df()
self.create_ddb_database(df_calendar)
self.create_ddb_calendar(df_calendar)
for hft_type_name in self.hft_type_list:
self.create_ddb_partition_table(hft_type_name)
def init_ddb_table_data(self, df_calendar, num_workers=None):
"""
对每个股票进行循环转录数据到分区表
"""
stock_list = df_calendar['code'].unique().astype('str')
# 不能重复创建Pool对象因此需要在循环的最外侧创建好Pool对象然后传参进去
with Pool(self.num_workers if num_workers is None else num_workers) as pool:
for hft_type_name in self.hft_type_list:
print('Will work on hft type:', hft_type_name)
with tqdm(stock_list) as pbar:
for stock_id in pbar:
pbar.set_description(f"Working on stock {stock_id}")
self.dump_hft_to_ddb(hft_type_name, stock_id, pbar=pbar, pool=pool)
def _get_stock_date_list(self, cache=False):
"""
Deprecated: This function is replaced by `create_ddb_calendar()`.
"""
if cache:
with open('tmp.pkl', 'rb') as fin:
stock_list, date_list = pickle.load(fin)
else:
with self.mssql_engine.connect() as conn:
# 从KLine表查询主要是因为KLine表最小
stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine"
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
stock_list, date_list = zip(*stock_date_list)
# cache
#with open('tmp.pkl', 'wb') as fout:
# pickle.dump((stock_list, date_list), fout)
return pd.Series(stock_list, dtype='str').unique(), \
pd.Series(date_list, dtype='datetime64[D]').unique()
def create_ddb_database(self, pd_calendar):
# 从`pd_calendar`中创建`stock_list`和`date_list`
stock_list = pd_calendar['code'].unique().astype('str')
date_list = pd_calendar['m_nDate'].unique().astype('datetime64[D]')
# 可以把所有股票高频数据放在一个数据库中不同的表
# 分区策略是跟数据库绑定的,因此需要保证同一个数据库中的表都使用同样的分区额策略
# 对于股票高频数据我们可以使用COMPO的分区策略并且两个子db的分区策略都是VALUE类型的code和m_nDate字段
if self.ddb_sess.existsDatabase(self.ddb_path):
print('Wiil drop database:', self.ddb_path)
self.ddb_sess.dropDatabase(self.ddb_path)
# 要创建一个COMPO分区的数据库需要首先创建两个简单分区的子数据库
# 这里我们使用先按日期,然后按股票分区的子数据库
# Please note that when creating a DFS database with COMPO domain,
# the parameter dbPath for each partition level must be either an empty string or unspecified.
db_date = self.ddb_sess.database('db_date', partitionType=keys.VALUE, partitions=date_list, dbPath='')
# 这里看起来直接使用dolphindb的脚本语句更方便一些
self.ddb_sess.run("""
db_stock = database("", 5, [SYMBOL, {num_code_partition}])
""".format(
num_code_partition = self.num_code_parition
))
#self.ddb_sess.run("""
# db_stock = database("", 1, symbol({partitions}))
#""".format(
# partitions = '`' + '`'.join(stock_list)
#))
self.ddb_sess.run("""
{dbName} = database(
directory = '{dbPath}',
partitionType = COMPO,
partitionScheme = [db_date, db_stock],
engine = "TSDB")
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
self._load_ddb_dump_journal(recreate=True)
def load_ddb_database(self):
db_date = self.ddb_sess.database('db_date', dbPath='')
db_stock = self.ddb_sess.database('db_stock', dbPath='')
self.ddb_sess.run("{dbName} = database(directory='{dbPath}')".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path
))
self._load_ddb_dump_journal()
def _load_ddb_dump_journal(self, recreate=False):
if recreate or not Path(self.ddb_dump_journal_fname).exists():
print('Will create new dump journal.')
self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'w')
self.dump_journal_writer.write("type_name,stock_id,status\n")
self.dump_journal_writer.flush()
else:
print('Will load previous dump journal.')
self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'a')
self.dump_journal_df = pd.read_csv(self.ddb_dump_journal_fname)
self.dump_journal_df.set_index(['type_name', 'stock_id', 'status'], inplace=True)
# 因为dump_journal_df只会在创建的时候载入一次数据之后不会在写入因此可以在此时对index进行排序
self.dump_journal_df.sort_index(inplace=True)
print('Did load the dump journal, shape', self.dump_journal_df.shape)
#pprint(self.dump_journal_df.head())
def create_ddb_calendar(self, df_calendar):
mem_table = self.ddb_calendar_table_name + self.ddb_memory_table_suffix
per_table = self.ddb_calendar_table_name
# 1. 创建临时内存表
# calendar的行数大概是股票数量 * 交易日数量
self.ddb_sess.run("""
{table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
table_name = mem_table,
capacity = 5000 * 1000,
col_names = '`code`m_nDate',
col_types = "SYMBOL, DATE"
))
print('Did create the memory table')
# 2. 向内存表中插入所有(code, date)数据
appender = ddb.tableAppender(tableName=mem_table, ddbSession=self.ddb_sess)
num = appender.append(df_calendar)
print('Did append calendar data into ddb memory table, return code', num)
# 3. 创建持久化表格之前需要先根据路径创建一个database对象
# 但研究了一下发现好像一个database里面可以同时存在分区表和非分区表
# 所以在这里暂时就不创建新的database了
# 但因为原database设置成了TSDB所以必须在createTable的时候指定sortKey
#self.ddb_sess.run("""
# {db_name} =
#""")
# 4. 直接从内存表创建一个持久化表格
if self.ddb_sess.existsTable(self.ddb_path, per_table):
self.ddb_sess.dropTable(self.ddb_path, per_table)
self.ddb_sess.run("""
tableInsert(createTable(
dbHandle={ddb_dbname},
table={mem_table},
tableName=`{per_table},
sortColumns=`code`m_nDate,
compressMethods={{"m_nDate":"delta"}}
), {mem_table})
""".format(
ddb_dbname = self.ddb_dbname,
mem_table = mem_table,
per_table = per_table
))
print('Did create the persistent table with the memory table')
def make_calendar_df(self):
print('Will create calendar dataframe from SQL Server')
# 从KLine表查询主要是因为KLine表最小
with self.mssql_engine.connect() as conn:
stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine"
rs = conn.execute(stat)
stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()]
df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate'])
df_calendar['m_nDate'] = self.make_date(df_calendar['m_nDate'])
print('Did make the DataFrame for calendar')
return df_calendar
def _make_table_skeleton(self, hft_type_name, table_capacity=default_table_capacity):
def _make_tbl_config(field_list):
"""
根据ProtoBuffEntity对象的Descriptor.fields创建ddb标准的列名列表和列类型列表
"""
col_name_list, col_type_list = [], []
for desc in field_list:
col_name_list.append(desc.name)
# 如果对列明有特殊设定,目前仅包括`code`m_nDate和`m_nTime三个字段
if desc.name in self.col_type_mapping:
col_type_list.append(self.col_type_mapping[desc.name])
# 通过对ProtoBuffEntity的类型编号映射到ddb的类型编号
# 如果默认值是一个数组那么ddb类型要额外增加说明是数组
# ProtoBuffEntity的类型编号只针对基本类型数组需要通过`default_value`来判断
else:
col_type = self.col_type_mapping[desc.type]
if isinstance(desc.default_value, list):
col_type += '[]'
col_type_list.append(col_type)
return col_name_list, col_type_list
desc_obj = self.protobuff_desc_dict[hft_type_name]
col_name_list, col_type_list = _make_tbl_config(desc_obj.fields)
table_name = hft_type_name + self.ddb_memory_table_suffix
print('-' * 80)
print('Will create table structure:', table_name)
self.ddb_sess.run("""
{table_name} = table({capacity}:0, {col_names}, [{col_types}]);
""".format(
table_name = table_name,
capacity = table_capacity,
col_names = '`' + '`'.join(col_name_list),
col_types = ', '.join([f"'{type_name}'" for type_name in col_type_list])
))
res = self.ddb_sess.run(f"schema({table_name}).colDefs")
pprint(res)
print('-' * 80)
return table_name
def create_ddb_partition_table(self, hft_type_name):
memory_table_name = self._make_table_skeleton(hft_type_name, 10)
partition_table_name = hft_type_name + self.ddb_partition_table_suffix
print('-' * 80)
print('Will create partitioned table:', partition_table_name)
self.ddb_sess.run("""
{ddb_dbname}.createPartitionedTable(
table = {memory_table_name},
tableName = `{partition_table_name},
partitionColumns = `m_nDate`code,
sortColumns = `code`m_nDate`m_nTime,
compressMethods = {{m_nDate:"delta", m_nTime:"delta"}}
)
""".format(
ddb_dbname = self.ddb_dbname,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name
))
res = self.ddb_sess.run(f"schema(loadTable('{self.ddb_path}', '{partition_table_name}')).colDefs")
pprint(res)
print('-' * 80)
def dump_hft_to_ddb(self, type_name, stock_id, trade_date=None, pbar=None, pool=None):
if (type_name, stock_id, 'OK') in self.dump_journal_df.index:
message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal."
if pbar is None:
print(message)
else:
pbar.set_description(message)
return
elif (type_name, stock_id, 'START') in self.dump_journal_df.index:
# 任务已经开始,但是没有完全结束,就需要逐个检查日期,跳过已经录入输入的交易日数据
# 同时也不会再往日志文件中写入START记录
_journal_dt = self.ddb_sess.run(f"""
select m_nDate from tbl where code='{stock_id}' group by m_nDate map;
""").set_index('m_nDate')
else:
_journal_dt = None
self.dump_journal_writer.write(f"{type_name},{stock_id},START\n")
self.dump_journal_writer.flush()
# 经过尝试按个股来做batch查询效率还是可以接受的
# mssql中索引字段是(S_INFO_WINDCODE, TRADE_DT)
with self.mssql_engine.connect() as conn:
stat = """
select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}]
where S_INFO_WINDCODE='{stock_id}'
""".format(
mssql_type_name = self.mssql_name_dict[type_name],
stock_id = stock_id
)
row_list = list(conn.execute(stat).fetchall())
# 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期
# 这里只把日期值不再`_journal_dt`的记录放入`row_list`
if _journal_dt is not None:
row_list = [row for row in row_list
if pd.to_datetime(row[1]) not in _journal_dt]
print(f"Resume job for {stock_id}, with {len(row_list)} rows left.")
num_rows = len(row_list)
# 如果行数为0则说明是空数据可以直接返回
if num_rows == 0:
return
if pbar:
#pbar.set_description(f"Did get the result set for stock {stock_id} from mssql")
pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}")
else:
print(f"Did get the result set for stock {stock_id} from mssql")
# 每一行是当个个股某一日的所有高频交易信息
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None:
print("Will create new Pool object, but this is not encourage for large batch work.")
pool = Pool(self.num_worker)
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm(total=num_rows, leave=False) as sub_pbar:
for _ in pool.imap_unordered(
functools.partial(
dump_stock_daily_to_ddb,
type_name = type_name,
stock_id = stock_id
),
row_list
):
sub_pbar.update()
self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n")
self.dump_journal_writer.flush()
@staticmethod
def make_stock_daily_df(blob, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
blob = gzip.decompress(blob)
dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()")
dataArray.ParseFromString(blob)
data_dict_list = [
{field.name : val for field, val in entry.ListFields()}
for entry in dataArray.dataArray
]
array_type_list = [
field.name
for field, val in dataArray.dataArray[0].ListFields()
if isinstance(field.default_value, list)
]
#pprint(array_type_list)
df = pd.DataFrame(data_dict_list)
#df['code'] = make_symbol(df['code'])
df['code'] = stock_id
df['m_nDate'] = self.make_date(df['m_nDate'])
df['m_nTime'] = df['m_nDate'] + self.make_time(df['m_nTime'])
for field_name in array_type_list:
df[field_name] = self.make_nparray(df[field_name])
#print(f"Did create ddb table for dataframe of shape {df.shape}")
# self.make_table_skeleton(type_name, df.shape[0])
return df
@staticmethod
def dump_stock_daily_to_ddb(row, type_name, stock_id):
"""
用于做多进程录入ddb的函数
"""
df_table_name = type_name
df = make_stock_daily_df(row[2], type_name, stock_id)
ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848)
ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password'])
ddb_sess.upload({df_table_name : df})
# 因为在做Tick数据的时候偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
#ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
dbPath = DDBLoader.ddb_path,
partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix,
df_table_name = df_table_name
))

@ -0,0 +1,100 @@
import importlib
import pickle
import functools
import abc
from pprint import pprint
from pathlib import Path
import numpy as np
import pandas as pd
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
from DDBBase import DDBBase
class DDBLoader(DDBBase):
"""
- 放了几个公用的配置字段包括
1. SQL-Server的链接参数
2. DolphinDB的链接参数
- 放了几个@abstractmethod在里面不过如果不需要使用多态特性那应该用处不大
1. create_ddb_database
2. create_ddb_partition_table
"""
mssql_config = {
'host' : '192.168.1.7',
'username' : 'sa',
'password' : 'passw0rd!'
}
def __init__(self):
super().__init__()
self.mssql_engine = sa.create_engine(
"mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
connect_args = {
"TrustServerCertificate": "yes"
}, echo=False
)
@abc.abstractmethod
def create_ddb_database(self, *args, **kwargs):
"""
创建database函数需要被子类具体实现
"""
return
@abc.abstractmethod
def create_ddb_partition_table(self, *args, **kwargs):
"""
创建分区表函数需要被子类具体实现
"""
return
@staticmethod
def tscode_to_windcode(series):
return series.apply(lambda x : x[2:] + '.' + x[:2])
@staticmethod
def make_symbol(series):
return series.astype('int32').astype('str')\
.apply(str.zfill, args=(6,))\
.apply(lambda code : \
code + '.SH' if code[0] == '6' \
else code + '.SZ')
@staticmethod
def make_date(series):
# 特别是对于分红表如果某些关键日期还未公布则会填充0导致日期解析失败
series.loc[series == 0] = np.nan
return pd.to_datetime(
series.astype(str), format='%Y%m%d')
@staticmethod
def make_nparray(series):
return series.apply(lambda x : np.array(x))
@staticmethod
def make_time(series):
s_hr = series // 10000000 * 3600000
s_min = series % 10000000 // 100000 * 60000
s_sec = series % 100000 // 1000
s_ms = series % 1000
return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms')

@ -0,0 +1,287 @@
import warnings
from pprint import pprint
from pathlib import Path
from tqdm import tqdm
import numpy as np
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
import dolphindb as ddb
import dolphindb.settings as keys
import sqlalchemy as sa
from DDBLoader import DDBLoader
class DDBPITLoader(DDBLoader):
ddb_path = "dfs://pit_stock_ts"
ddb_dbname = "ddb_pit_stock_ts"
num_code_partition = 50
table_name_mapping = {
#'CBS_AFTER_ADJ' : 'bs_common_adj',
#'CBS_BEFORE_ADJ' : 'bs_common_ori',
#'CCFS_AFTER_ADJ' : 'cfs_common_adj',
#'CCFS_BEFORE_ADJ' : 'cfs_common_ori',
#'CIS_AFTER_ADJ' : 'is_common_adj',
#'CIS_BEFORE_ADJ' : 'is_common_ori',
'DIV_WIND' : 'divident',
#'EP_WIND' : 'earnings_preannouncement',
#'PEE_WIND' : 'preliminary_earnings_estimate'
}
meta_col_config = {
'WIND_CODE' : ('code', 'SYMBOL'),
# mssql表中不需要记录的meta字段在这里直接设置为None
'IntCode' : None,
'ACTUAL_ANN_DT' : None,
'ReportPeriod' : ('report_period', 'DATE'),
'AppearInPeriod' : ('appear_in_period', 'DATE'),
'AppearAtDate' : ('appear_at_date', 'DATE')
}
date_col_set = {
'report_period',
'appear_in_period',
'appear_at_date',
'ReportPeriod',
'AppearInPeriod',
'AppearAtDate',
'EQY_RECORD_DT',
'EX_DT',
'DVD_PAYOUT_DT',
'S_DIV_PRELANDATE',
'S_DIV_SMTGDATE',
'DVD_ANN_DT',
'S_DIV_PREANNDT'
}
ddb_type_mapping = {
'float' : 'DOUBLE',
'int' : 'INT',
'text' : 'STRING',
'varchar' : 'STRING',
'str' : 'STRING'
}
# 基本面数据库现在存放在91服务器之上
mssql_config = {
'host' : '192.168.1.91',
'username' : 'sa',
'password' : 'xn.123',
'dbname' : 'tr_statement'
}
def __init__(self):
super().__init__()
# 重新设定mssql_engine对象此时我们需要使用基本面数据库
self.mssql_engine = sa.create_engine(
"mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config),
connect_args = {
"TrustServerCertificate": "yes"
}, echo=False
)
def create_ddb_database(self):
self.ddb_sess.run("""
{dbName} = database(
directory = '{dbPath}',
partitionType = HASH,
partitionScheme = [SYMBOL, {num_code_partition}],
engine = 'TSDB'
)
""".format(
dbName = self.ddb_dbname,
dbPath = self.ddb_path,
num_code_partition = self.num_code_partition
))
def _make_col_config(self, mssql_table_name):
"""
Return:
mssql_col_name_list, ddb_col_name_list, ddb_col_type_list
"""
with self.mssql_engine.connect() as conn:
col_sp_list = list(conn.execute(f"exec sp_columns {mssql_table_name}").fetchall())
mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \
[], [], []
for col_sp in col_sp_list:
_col_name = col_sp[3]
_col_type = col_sp[5]
# 对于meta字段需要根据meta配置表来进行处理
if _col_name in self.meta_col_config:
# 跳过mssql表中 不需要记录的meta字段
if self.meta_col_config[_col_name] is None:
continue
# 字段名和字段类型都要进行映射
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(self.meta_col_config[_col_name][0])
ddb_col_type_list.append(self.meta_col_config[_col_name][1])
# 对于非meta字段仅需要检查是否是float类型对于float类型设置类型为DOUBLE
else:
# 需要之后被转换成DATE的字段一般在原表中为为INT类型
if _col_name in self.date_col_set:
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(_col_name)
ddb_col_type_list.append('DATE')
# 按照对照表进行类型转换
elif _col_type in self.ddb_type_mapping:
mssql_col_name_list.append(_col_name)
ddb_col_name_list.append(_col_name)
ddb_col_type_list.append(self.ddb_type_mapping[_col_type])
# 对照表中没有的字段类型,就不加入到字段列表中了
else:
print(f"!**Unrecognized type '{_col_type}' for column {_col_name}, will skip.")
return mssql_col_name_list, ddb_col_name_list, ddb_col_type_list
def create_ddb_partition_table(self, mssql_table_name):
"""创建分区表"""
memory_table_name = mssql_table_name
partition_table_name = self.table_name_mapping[mssql_table_name]
mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \
self._make_col_config(mssql_table_name)
# 根据是否
if 'appear_in_period' in ddb_col_name_list:
compress_methods = """{
'report_period' : 'delta',
'appear_in_period' : 'delta',
'appear_at_date' : 'delta'
}"""
else:
compress_methods = """{
'report_period' : 'delta',
'appear_at_date' : 'delta'
}"""
# 因为已经根据`appear_in_period`分列了调整前和调整后,因此不需要对它再进行排序了
sort_columns = "`code`report_period`appear_at_date"
# 1. 先创建内存表,内存表中设定好列名和列类型
# 2. 然后根据内存表创建分区表,设定分区列等信息
self.ddb_sess.run("""
{memory_table_name} = table(
{capacity}:0,
{column_name_list},
[{column_type_list}]
);
if (existsTable("{ddb_path}", "{partition_table_name}")) {{
dropTable({ddb_dbname}, "{partition_table_name}");
}}
{partition_table_name} = createPartitionedTable(
dbHandle = {ddb_dbname},
table = {memory_table_name},
tableName = "{partition_table_name}",
partitionColumns = 'code',
compressMethods = {compress_methods},
sortColumns = {sort_columns}
);
""".format(
ddb_dbname = self.ddb_dbname,
ddb_path = self.ddb_path,
memory_table_name = memory_table_name,
partition_table_name = partition_table_name,
capacity = 10,
column_name_list = '`' + '`'.join(ddb_col_name_list),
column_type_list = ','.join(ddb_col_type_list),
compress_methods = compress_methods.replace('\n', '').replace(' ', ''),
sort_columns = sort_columns
))
print('-' * 80)
print(f"Did create parition table <{partition_table_name}>:")
pprint(self.ddb_sess.run(f"schema({partition_table_name});"))
return partition_table_name, mssql_col_name_list
def create_ddb_partition_tables(self):
for mssql_table_name in self.table_name_mapping:
self.create_ddb_partition_table(mssql_table_name)
def _dump_pit_to_ddb(self, mssql_table_name):
print('Will work on table', mssql_table_name)
# 返回的`mssql_col_name_list`可以用来对SQL-Server获取的dataframe进行列过滤
partition_table_name, mssql_col_name_list = \
self.create_ddb_partition_table(mssql_table_name)
with self.mssql_engine.connect() as conn:
stat = f"select distinct [WIND_CODE] from {mssql_table_name}"
stock_id_list = list(conn.execute(stat).fetchall())
with tqdm(stock_id_list) as pbar:
for (stock_id,) in pbar:
pbar.set_description(f"Will work on {stock_id}")
#pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server")
stat = """
select * from {mssql_table_name}
where WIND_CODE='{stock_id}' and AppearAtDate>0
""".format(
mssql_table_name = mssql_table_name,
stock_id = stock_id
)
row_list = list(conn.execute(stat).fetchall())
num_rows = len(row_list)
# 因为对AppearAtDate做了过滤所以有可能得到一个空的数据集
if num_rows == 0:
print(f"Will skip {stock_id} due to empty result.")
continue
#pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}")
# 这里需要对select语句获取的所有列进行一次过滤以保证和partition table中的列一致
df = pd.DataFrame(row_list)[mssql_col_name_list]
# 需要把部分字段的int字段类型转换成DATE字段类型
for df_col in df.columns:
if df_col in self.date_col_set:
df[df_col] = DDBLoader.make_date(df[df_col])
# 因为在做数据库View的时候已经做过一轮转换了所以这里就不需要再次转换了
#df['WIND_CODE'] = DDBLoader.tscode_to_windcode(df['WIND_CODE'])
self.ddb_sess.upload({mssql_table_name : df})
self.ddb_sess.run(f"{partition_table_name}.tableInsert({mssql_table_name})")
def dump_pit_to_ddb(self):
for mssql_table_name in self.table_name_mapping:
self._dump_pit_to_ddb(mssql_table_name)
def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
# PIT基本面数据
loader = DDBPITLoader()
loader.create_ddb_database()
#loader.create_ddb_partition_tables()
loader.dump_pit_to_ddb()
# 日频行情数据
#loader = DDBDailyLoader()
#loader.load_ddb_database()
#loader.dump_daily_kline_to_ddb()
if __name__ == '__main__':
main()

@ -0,0 +1,4 @@
# import KLineMessage_pb2, OrderMessage_pb2, TickMessage_pb2
# __all__ = ['KLineMessage_pb2', 'OrderMessage_pb2', 'TickMessage_pb2']

@ -0,0 +1,31 @@
from loader.DDBHFTLoader import DDBHFTLoader
from loader.DDBBasicInfoLoader import DDBBasicInfoLoader
def main():
# TODO:
# 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。
#loader = DDBBasicInfoLoader()
#loader.create_ddb_database()
#loader.create_ddb_partition_table()
# 高频数据
loader = DDBHFTLoader()
df_calendar = loader.make_calendar_df()
#loader.init_ddb_database(df_calendar)
#print('Did finish init_ddb_database')
loader.load_ddb_database()
print('Did load ddb database')
loader.init_ddb_table_data(df_calendar)
print('Did finish init_table_data')
if __name__ == '__main__':
main()
Loading…
Cancel
Save