From 0ca31dcd47ad80da3ff6884295f4d247f3855821 Mon Sep 17 00:00:00 2001 From: Guofu Li Date: Wed, 24 Aug 2022 14:40:33 +0800 Subject: [PATCH] 1. Re-organize the directory structure, creating sub-packages for `DDBLoader`. 2. Update `DDBHFTLoader` to better handle the journal file. 3. Move the entrance scripts to `src/run.py`. --- dolphindb/.workspace | 4 +- ipynb/ddb.ipynb | 657 +++++++++++- ipynb/ddb_pe.ipynb | 333 ++++++ ipynb/mssql.ipynb | 8 +- src/DDBBase.py | 2 +- src/DDBLoader.py | 950 ------------------ src/__init__.py | 3 + src/{ => expr}/DDBExpression.py | 0 src/{ => factor}/DDBFactor.py | 0 src/loader/DDBBasicInfoLoader.py | 159 +++ src/loader/DDBDailyLoader.py | 160 +++ src/loader/DDBHFTLoader.py | 484 +++++++++ src/loader/DDBLoader.py | 100 ++ src/loader/DDBPITLoader.py | 287 ++++++ .../HFDataTableMessage_pb2.py | 0 .../IndexFutureKLineMessage_pb2.py | 0 .../IndexFutureL1TickMessage_pb2.py | 0 .../ProtoBuffEntitys/IndexKLineMessage_pb2.py | 0 .../ProtoBuffEntitys/IndexTickMessage_pb2.py | 0 .../ProtoBuffEntitys/KLineMessage_pb2.py | 0 .../ProtoBuffEntitys/OrderMessage_pb2.py | 0 .../ProtoBuffEntitys/TickMessage_pb2.py | 0 .../ProtoBuffEntitys/TickQueueMessage_pb2.py | 0 .../ProtoBuffEntitys/TranseMessage_pb2.py | 0 src/loader/ProtoBuffEntitys/__init__.py | 4 + src/{ => loader}/make_hft.py | 0 src/run.py | 31 + 27 files changed, 2200 insertions(+), 982 deletions(-) create mode 100644 ipynb/ddb_pe.ipynb delete mode 100644 src/DDBLoader.py create mode 100644 src/__init__.py rename src/{ => expr}/DDBExpression.py (100%) rename src/{ => factor}/DDBFactor.py (100%) create mode 100644 src/loader/DDBBasicInfoLoader.py create mode 100644 src/loader/DDBDailyLoader.py create mode 100644 src/loader/DDBHFTLoader.py create mode 100644 src/loader/DDBLoader.py create mode 100644 src/loader/DDBPITLoader.py rename src/{ => loader}/ProtoBuffEntitys/HFDataTableMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/IndexFutureKLineMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/IndexFutureL1TickMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/IndexKLineMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/IndexTickMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/KLineMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/OrderMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/TickMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/TickQueueMessage_pb2.py (100%) rename src/{ => loader}/ProtoBuffEntitys/TranseMessage_pb2.py (100%) create mode 100644 src/loader/ProtoBuffEntitys/__init__.py rename src/{ => loader}/make_hft.py (100%) create mode 100644 src/run.py diff --git a/dolphindb/.workspace b/dolphindb/.workspace index f205732..d24ffac 100755 --- a/dolphindb/.workspace +++ b/dolphindb/.workspace @@ -12,5 +12,5 @@ Open,/Users/guofu/Workspaces/dolphindb/test2/scripts/pit_series_report_period_at Server,.167,192.168.1.167,8848,,admin,123456 Server,.7,192.168.1.7,8848,,admin,123456 Server,local8848,localhost,8848,,, -ActiveFile,/Users/guofu/Workspaces/dolphindb/test2/scripts/pit6.dos -ActiveServer,.167 +ActiveFile,/Users/guofu/Workspaces/dolphindb/test2/scripts/test2.dos +ActiveServer,.7 diff --git a/ipynb/ddb.ipynb b/ipynb/ddb.ipynb index e04a63e..e7669e7 100644 --- a/ipynb/ddb.ipynb +++ b/ipynb/ddb.ipynb @@ -48,25 +48,130 @@ }, { "cell_type": "code", - "execution_count": 68, - "id": "8b7dae3d-aef1-4c50-92b2-460d4fea0a96", + "execution_count": 32, + "id": "c8d07fc8-d80c-490f-9220-0d3e8e4c72a4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "114350" + "50" ] }, - "execution_count": 68, + "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ + "sess = ddb.session('localhost', 8848)\n", + "sess.login('admin', '123456')\n", + "\n", + "# backup(backup_path, sql_obj, force, parallel)\n", + "code = \"\"\"\n", + " backup('/data/dolphindb/backup/', , true, false);\n", + " \n", + " backup('/data/dolphindb/backup/', , true, false);\n", + " \n", + " backup('/data/dolphindb/backup/', , true, false);\n", + " \n", + " backup('/data/dolphindb/backup/', , true, false);\n", + " backup('/data/dolphindb/backup/', , true, false);\n", + "\"\"\")" + ] + }, { "cell_type": "code", "execution_count": 69, @@ -1113,7 +1218,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 29, "id": "d68ea326-82c3-4a7c-97cf-c04dd8aee56b", "metadata": {}, "outputs": [ @@ -1144,18 +1249,18 @@ " \n", " \n", " 0\n", - " 0\n", + " 1468390810\n", " \n", " \n", "\n", "" ], "text/plain": [ - " sum_cnt\n", - "0 0" + " sum_cnt\n", + "0 1468390810" ] }, - "execution_count": 27, + "execution_count": 29, "metadata": {}, "output_type": "execute_result" } @@ -1171,32 +1276,327 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 38, "id": "7cbc4906-7756-424a-9ce5-9d2b6d1bab4b", "metadata": {}, "outputs": [ { - "ename": "RuntimeError", - "evalue": " in run: Server response: 'tbl = loadTable(\"dfs://hft_stock_ts\", \"TickPartitioned\") => getFileBlocksMeta on path '/hft_stock_ts/TickPartitioned.tbl' failed, reason: path does not exist' script: '\ntbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\nselect sum(cnt) from (select count(*) as cnt from tbl map);\n'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mRuntimeError\u001b[0m Traceback (most recent call last)", - "Input \u001b[0;32mIn [28]\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m sess \u001b[38;5;241m=\u001b[39m ddb\u001b[38;5;241m.\u001b[39msession(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m192.168.1.167\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;241m8848\u001b[39m)\n\u001b[1;32m 2\u001b[0m sess\u001b[38;5;241m.\u001b[39mlogin(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124madmin\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124m123456\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[0;32m----> 3\u001b[0m \u001b[43msess\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\"\"\u001b[39;49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;124;43mtbl = loadTable(\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mdfs://hft_stock_ts\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43m, \u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mTickPartitioned\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m);\u001b[39;49m\n\u001b[1;32m 5\u001b[0m \u001b[38;5;124;43mselect sum(cnt) from (select count(*) as cnt from tbl map);\u001b[39;49m\n\u001b[1;32m 6\u001b[0m \u001b[38;5;124;43m\"\"\"\u001b[39;49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/.venv/tinysoft/lib/python3.8/site-packages/dolphindb/session.py:161\u001b[0m, in \u001b[0;36msession.run\u001b[0;34m(self, script, *args, **kwargs)\u001b[0m\n\u001b[1;32m 159\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mfetchSize\u001b[39m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;129;01min\u001b[39;00m kwargs\u001b[38;5;241m.\u001b[39mkeys():\n\u001b[1;32m 160\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m BlockReader(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcpp\u001b[38;5;241m.\u001b[39mrunBlock(script, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs))\n\u001b[0;32m--> 161\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcpp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun\u001b[49m\u001b[43m(\u001b[49m\u001b[43mscript\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n", - "\u001b[0;31mRuntimeError\u001b[0m: in run: Server response: 'tbl = loadTable(\"dfs://hft_stock_ts\", \"TickPartitioned\") => getFileBlocksMeta on path '/hft_stock_ts/TickPartitioned.tbl' failed, reason: path does not exist' script: '\ntbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\nselect sum(cnt) from (select count(*) as cnt from tbl map);\n'" - ] + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
sum_cnt
04991926704
\n", + "
" + ], + "text/plain": [ + " sum_cnt\n", + "0 4991926704" + ] + }, + "execution_count": 38, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ - "sess = ddb.session('192.168.1.167', 8848)\n", + "sess = ddb.session('192.168.1.7', 8848)\n", "sess.login('admin', '123456')\n", "sess.run(\"\"\"\n", - "tbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\n", - "select sum(cnt) from (select count(*) as cnt from tbl map);\n", + " tbl = loadTable(\"dfs://hft_stock_ts\", 'TickPartitioned');\n", + " select sum(cnt) from (select count(*) as cnt from tbl map);\n", + "\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "e9ab5e57-dce5-4426-9bac-4238cd067197", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
sum_cnt
09388749
\n", + "
" + ], + "text/plain": [ + " sum_cnt\n", + "0 9388749" + ] + }, + "execution_count": 39, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sess.run(\"\"\"\n", + " select sum(cnt) from (select count(*) as cnt from tbl where code='002182.SZ' map);\n", + "\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "id": "4ba45027-bbb5-4b27-99da-3452cc8d2f1c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
count
02298
\n", + "
" + ], + "text/plain": [ + " count\n", + "0 2298" + ] + }, + "execution_count": 43, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sess.run(\"\"\"\n", + " select count(*) from (\n", + " select code, m_nDate, count(*) as cnt from tbl where code='002182.SZ' group by code, m_nDate map\n", + " );\n", "\"\"\")" ] }, + { + "cell_type": "code", + "execution_count": 57, + "id": "29ab8af5-e571-4064-b691-a186d9fb4d08", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
m_nDate
2013-01-04
2013-01-07
2013-01-08
2013-01-09
2013-01-10
...
2022-07-04
2022-07-05
2022-07-06
2022-07-07
2022-07-08
\n", + "

2298 rows × 0 columns

\n", + "
" + ], + "text/plain": [ + "Empty DataFrame\n", + "Columns: []\n", + "Index: [2013-01-04 00:00:00, 2013-01-07 00:00:00, 2013-01-08 00:00:00, 2013-01-09 00:00:00, 2013-01-10 00:00:00, 2013-01-11 00:00:00, 2013-01-14 00:00:00, 2013-01-15 00:00:00, 2013-01-16 00:00:00, 2013-01-17 00:00:00, 2013-01-18 00:00:00, 2013-01-21 00:00:00, 2013-01-22 00:00:00, 2013-01-23 00:00:00, 2013-01-24 00:00:00, 2013-01-25 00:00:00, 2013-01-28 00:00:00, 2013-01-29 00:00:00, 2013-01-30 00:00:00, 2013-01-31 00:00:00, 2013-02-01 00:00:00, 2013-02-04 00:00:00, 2013-02-05 00:00:00, 2013-02-06 00:00:00, 2013-02-07 00:00:00, 2013-02-08 00:00:00, 2013-02-18 00:00:00, 2013-02-19 00:00:00, 2013-02-20 00:00:00, 2013-02-21 00:00:00, 2013-02-22 00:00:00, 2013-02-25 00:00:00, 2013-02-26 00:00:00, 2013-02-27 00:00:00, 2013-02-28 00:00:00, 2013-03-01 00:00:00, 2013-03-04 00:00:00, 2013-03-05 00:00:00, 2013-03-06 00:00:00, 2013-03-07 00:00:00, 2013-03-08 00:00:00, 2013-03-11 00:00:00, 2013-03-12 00:00:00, 2013-03-13 00:00:00, 2013-03-14 00:00:00, 2013-03-15 00:00:00, 2013-03-18 00:00:00, 2013-03-19 00:00:00, 2013-03-20 00:00:00, 2013-03-21 00:00:00, 2013-03-22 00:00:00, 2013-03-25 00:00:00, 2013-03-26 00:00:00, 2013-03-27 00:00:00, 2013-03-28 00:00:00, 2013-03-29 00:00:00, 2013-04-01 00:00:00, 2013-04-02 00:00:00, 2013-04-03 00:00:00, 2013-04-08 00:00:00, 2013-04-09 00:00:00, 2013-04-10 00:00:00, 2013-04-11 00:00:00, 2013-04-12 00:00:00, 2013-04-15 00:00:00, 2013-04-16 00:00:00, 2013-04-17 00:00:00, 2013-04-18 00:00:00, 2013-04-19 00:00:00, 2013-04-22 00:00:00, 2013-04-23 00:00:00, 2013-04-24 00:00:00, 2013-04-25 00:00:00, 2013-04-26 00:00:00, 2013-05-02 00:00:00, 2013-05-03 00:00:00, 2013-05-06 00:00:00, 2013-05-07 00:00:00, 2013-05-08 00:00:00, 2013-05-09 00:00:00, 2013-05-10 00:00:00, 2013-05-13 00:00:00, 2013-05-14 00:00:00, 2013-05-15 00:00:00, 2013-05-16 00:00:00, 2013-05-17 00:00:00, 2013-05-20 00:00:00, 2013-05-21 00:00:00, 2013-05-22 00:00:00, 2013-05-23 00:00:00, 2013-05-24 00:00:00, 2013-05-27 00:00:00, 2013-05-28 00:00:00, 2013-05-29 00:00:00, 2013-05-30 00:00:00, 2013-05-31 00:00:00, 2013-06-03 00:00:00, 2013-06-04 00:00:00, 2013-06-05 00:00:00, 2013-06-06 00:00:00, ...]\n", + "\n", + "[2298 rows x 0 columns]" + ] + }, + "execution_count": 57, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = sess.run(\"\"\"\n", + " select m_nDate from tbl where code='002182.SZ' group by m_nDate map;\n", + "\"\"\").set_index('m_nDate')\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "id": "013d191b-2a41-4d7b-8382-b6ee8764c2d0", + "metadata": {}, + "outputs": [], + "source": [ + "import sqlalchemy as sa\n", + "engine = sa.create_engine(\n", + " 'mssql+pyodbc://sa:passw0rd!@192.168.1.7/master?driver=ODBC+Driver+18+for+SQL+Server',\n", + " connect_args = {\n", + " \"TrustServerCertificate\": \"yes\"\n", + " }, echo=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "id": "c90e8e9d-cde0-4a7a-afd3-ea771483f001", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'20130104'" + ] + }, + "execution_count": 54, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with engine.connect() as conn:\n", + " rows = conn.execute(\"select * from [Level2BytesTick].dbo.[Tick] where S_INFO_WINDCODE='002182.SZ'\").fetchall()\n", + " rows = [row for row in rows]\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "id": "525dfcf1-720a-4b65-90b5-28d0c2118cf5", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 58, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "dt = pd.to_datetime(rows[0][1], format='%Y%m%d')\n", + "dt in df.index" + ] + }, { "cell_type": "code", "execution_count": 31, @@ -1231,14 +1631,221 @@ "\t} while (del_date <= 2022.12.31)\n", "}\n", "\"\"\"\n", - "sess.run(code)" + "# 这段代码运行好赶紧注释掉,非常危险\n", + "# sess.run(code)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 37, "id": "ea0501e7-d416-45ce-add5-e443c55f158c", "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
codeindustry_codeindustry_nameindustry_levelenter_dateexit_dateis_newindustry_class
0000017.SZSWHY310000申万交运设备11992-03-312011-10-100SWHY
1000017.SZSWHY310300申万非汽车交运设备21992-03-312011-10-100SWHY
2000017.SZSWHY310301申万摩托车31992-03-312011-10-100SWHY
3000017.SZSWHY310000申万交运设备12011-10-102014-02-210SWHY
4000017.SZSWHY310300申万非汽车交运设备22011-10-102014-02-210SWHY
...........................
95000157.SZSWHY260202申万工程机械32000-10-122014-02-210SWHY
96000157.SZSWHY640000申万机械设备12014-02-212021-12-130SWHY
97000157.SZSWHY640200申万专用设备22014-02-212021-12-130SWHY
98000157.SZSWHY640201申万工程机械32014-02-212021-12-130SWHY
99000157.SZSWHY640000申万机械设备12021-12-13NaT1SWHY
\n", + "

100 rows × 8 columns

\n", + "
" + ], + "text/plain": [ + " code industry_code industry_name industry_level enter_date \\\n", + "0 000017.SZ SWHY310000 申万交运设备 1 1992-03-31 \n", + "1 000017.SZ SWHY310300 申万非汽车交运设备 2 1992-03-31 \n", + "2 000017.SZ SWHY310301 申万摩托车 3 1992-03-31 \n", + "3 000017.SZ SWHY310000 申万交运设备 1 2011-10-10 \n", + "4 000017.SZ SWHY310300 申万非汽车交运设备 2 2011-10-10 \n", + ".. ... ... ... ... ... \n", + "95 000157.SZ SWHY260202 申万工程机械 3 2000-10-12 \n", + "96 000157.SZ SWHY640000 申万机械设备 1 2014-02-21 \n", + "97 000157.SZ SWHY640200 申万专用设备 2 2014-02-21 \n", + "98 000157.SZ SWHY640201 申万工程机械 3 2014-02-21 \n", + "99 000157.SZ SWHY640000 申万机械设备 1 2021-12-13 \n", + "\n", + " exit_date is_new industry_class \n", + "0 2011-10-10 0 SWHY \n", + "1 2011-10-10 0 SWHY \n", + "2 2011-10-10 0 SWHY \n", + "3 2014-02-21 0 SWHY \n", + "4 2014-02-21 0 SWHY \n", + ".. ... ... ... \n", + "95 2014-02-21 0 SWHY \n", + "96 2021-12-13 0 SWHY \n", + "97 2021-12-13 0 SWHY \n", + "98 2021-12-13 0 SWHY \n", + "99 NaT 1 SWHY \n", + "\n", + "[100 rows x 8 columns]" + ] + }, + "execution_count": 37, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sess.run(\"\"\"\n", + "tbl = loadTable(\"dfs://info_stock_ts\", 'info_industry');\n", + "select top 100 * from tbl map;\n", + "\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16dd860d-049f-420e-8d04-d1f8969e5ed1", + "metadata": {}, "outputs": [], "source": [] } diff --git a/ipynb/ddb_pe.ipynb b/ipynb/ddb_pe.ipynb new file mode 100644 index 0000000..aefa5b3 --- /dev/null +++ b/ipynb/ddb_pe.ipynb @@ -0,0 +1,333 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "2b46ea32-3e2e-466b-ac6a-9874336551af", + "metadata": {}, + "outputs": [], + "source": [ + "import dolphindb as ddb\n", + "\n", + "sess = ddb.session('localhost', 8848)\n", + "sess.login('admin', '123456')" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "6cc64a77-a5ad-46b6-8204-b188ee308ad3", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'partitionType': 5,\n", + " 'partitionColumnType': 17,\n", + " 'partitionColumnIndex': 0,\n", + " 'chunkPath': None,\n", + " 'colDefs': name typeString typeInt comment\n", + " 0 code SYMBOL 17 \n", + " 1 report_period DATE 6 \n", + " 2 appear_at_date DATE 6 \n", + " 3 S_PROFITNOTICE_STYLE STRING 18 \n", + " 4 S_PROFITNOTICE_CHANGEMIN DOUBLE 16 \n", + " 5 S_PROFITNOTICE_CHANGEMAX DOUBLE 16 \n", + " 6 S_PROFITNOTICE_NETPROFITMIN DOUBLE 16 \n", + " 7 S_PROFITNOTICE_NETPROFITMAX DOUBLE 16 \n", + " 8 S_PROFITNOTICE_REASON STRING 18 ,\n", + " 'chunkGranularity': 'TABLE',\n", + " 'partitionTypeName': 'HASH',\n", + " 'keepDuplicates': 'ALL',\n", + " 'engineType': 'TSDB',\n", + " 'partitionColumnName': 'code',\n", + " 'partitionSchema': 50,\n", + " 'sortColumns': array(['code', 'report_period', 'appear_at_date'], dtype=object),\n", + " 'partitionSites': None}" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sess.run(\"tbl_ep = loadTable('dfs://pit_stock_ts', 'earnings_preannouncement')\")\n", + "sess.run(\"schema(tbl_ep)\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "39c05247-deac-4c49-a20d-e19e637c6081", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
codereport_periodcnt
0600543.SH2021-12-313
1605499.SH2021-06-303
2603729.SH2020-12-313
3002058.SZ2020-12-313
4601599.SH2020-12-313
............
295600813.SH2002-06-304
296000689.SZ2002-06-305
297000653.SZ2002-06-304
298000658.SZ2002-06-307
299600853.SH2001-12-313
\n", + "

300 rows × 3 columns

\n", + "
" + ], + "text/plain": [ + " code report_period cnt\n", + "0 600543.SH 2021-12-31 3\n", + "1 605499.SH 2021-06-30 3\n", + "2 603729.SH 2020-12-31 3\n", + "3 002058.SZ 2020-12-31 3\n", + "4 601599.SH 2020-12-31 3\n", + ".. ... ... ...\n", + "295 600813.SH 2002-06-30 4\n", + "296 000689.SZ 2002-06-30 5\n", + "297 000653.SZ 2002-06-30 4\n", + "298 000658.SZ 2002-06-30 7\n", + "299 600853.SH 2001-12-31 3\n", + "\n", + "[300 rows x 3 columns]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sess.run(\"\"\"\n", + " select code, report_period, count(*) as cnt from tbl_ep group by code, report_period having count(*) > 2 order by report_period desc \n", + "\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "74623941-5f6e-48fc-b2a0-7050ae6c6045", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
codereport_periodappear_at_dateS_PROFITNOTICE_STYLES_PROFITNOTICE_CHANGEMINS_PROFITNOTICE_CHANGEMAXS_PROFITNOTICE_NETPROFITMINS_PROFITNOTICE_NETPROFITMAXS_PROFITNOTICE_REASON
0600543.SH2021-12-312021-10-30预亏0.00.00.00.01-9月,公司累计营业收入增长39.61%,但较2019年同期仍有明显下滑,目前处于亏损状态...
1600543.SH2021-12-312022-01-29预亏-4939.0-4939.0-9000.0-9000.01、主营业务影响。受新冠肺炎疫情等因素影响,消费市场低迷,流通环节受限、下游需求收缩,公司葡...
2600543.SH2021-12-312022-03-31预亏-5408.0-5408.0-9872.0-9872.0公司财务部门将2021年度计提的固定资产减值准备4830万元,作非经常性损益处理,在审计时,...
\n", + "
" + ], + "text/plain": [ + " code report_period appear_at_date S_PROFITNOTICE_STYLE \\\n", + "0 600543.SH 2021-12-31 2021-10-30 预亏 \n", + "1 600543.SH 2021-12-31 2022-01-29 预亏 \n", + "2 600543.SH 2021-12-31 2022-03-31 预亏 \n", + "\n", + " S_PROFITNOTICE_CHANGEMIN S_PROFITNOTICE_CHANGEMAX \\\n", + "0 0.0 0.0 \n", + "1 -4939.0 -4939.0 \n", + "2 -5408.0 -5408.0 \n", + "\n", + " S_PROFITNOTICE_NETPROFITMIN S_PROFITNOTICE_NETPROFITMAX \\\n", + "0 0.0 0.0 \n", + "1 -9000.0 -9000.0 \n", + "2 -9872.0 -9872.0 \n", + "\n", + " S_PROFITNOTICE_REASON \n", + "0 1-9月,公司累计营业收入增长39.61%,但较2019年同期仍有明显下滑,目前处于亏损状态... \n", + "1 1、主营业务影响。受新冠肺炎疫情等因素影响,消费市场低迷,流通环节受限、下游需求收缩,公司葡... \n", + "2 公司财务部门将2021年度计提的固定资产减值准备4830万元,作非经常性损益处理,在审计时,... " + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sess.run(\"\"\"\n", + "select * from tbl_ep where code='600543.SH' and report_period=2021.12.31\n", + "\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1425df79-bfaf-414a-9a12-f3d7e1231e5e", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ipynb/mssql.ipynb b/ipynb/mssql.ipynb index 8301f19..bb3a564 100644 --- a/ipynb/mssql.ipynb +++ b/ipynb/mssql.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 28, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -330,20 +330,20 @@ }, { "cell_type": "code", - "execution_count": 53, + "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "(4708,)\n" + "(2298,)\n" ] } ], "source": [ "with engine.connect() as conn:\n", - " stat = \"select count(*) from Level2BytesKLine.dbo.KLine where TRADE_DT='20220608'\"\n", + " stat = \"select count(*) from Level2BytesTick.dbo.Tick where S_INFO_WINDCODE='002182.SZ'\"\n", " rs = conn.execute(stat)\n", " for row in rs.fetchall():\n", " print(row)" diff --git a/src/DDBBase.py b/src/DDBBase.py index 04f35df..368fb31 100644 --- a/src/DDBBase.py +++ b/src/DDBBase.py @@ -13,6 +13,6 @@ class DDBBase(object): def __init__(self): self.ddb_sess = ddb.session(self.ddb_config['host'], 8848) - self.login(self.ddb_config['username'], self.ddb_config['password']) + self.ddb_sess.login(self.ddb_config['username'], self.ddb_config['password']) diff --git a/src/DDBLoader.py b/src/DDBLoader.py deleted file mode 100644 index 84d6c48..0000000 --- a/src/DDBLoader.py +++ /dev/null @@ -1,950 +0,0 @@ -import importlib -import gzip -import pickle -import functools -import abc -import warnings - -from pprint import pprint -from pathlib import Path -from tqdm import tqdm -#from tqdm.contrib.concurrent import process_map -from multiprocessing import Pool - -import numpy as np -import pandas as pd -from pandas.core.common import SettingWithCopyWarning -warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) - -import dolphindb as ddb -import dolphindb.settings as keys - -import sqlalchemy as sa - -import ProtoBuffEntitys - -from DDBBase import DDBBase - - -class DDBLoader(DDBBase): - """ - - 放了几个公用的配置字段,包括: - 1. SQL-Server的链接参数 - 2. DolphinDB的链接参数 - - - 放了几个@abstractmethod在里面,不过如果不需要使用多态特性,那应该用处不大: - 1. create_ddb_database - 2. create_ddb_partition_table - """ - - mssql_config = { - 'host' : '192.168.1.7', - 'username' : 'sa', - 'password' : 'passw0rd!' - } - - def __init__(self): - super().__init__() - self.mssql_engine = sa.create_engine( - "mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config), - connect_args = { - "TrustServerCertificate": "yes" - }, echo=False - ) - - - @abc.abstractmethod - def create_ddb_database(self, *args, **kwargs): - """ - 创建database函数,需要被子类具体实现。 - """ - return - - @abc.abstractmethod - def create_ddb_partition_table(self, *args, **kwargs): - """ - 创建分区表函数,需要被子类具体实现。 - """ - return - - - @staticmethod - def tscode_to_windcode(series): - return series.apply(lambda x : x[2:] + '.' + x[:2]) - - - @staticmethod - def make_symbol(series): - return series.astype('int32').astype('str')\ - .apply(str.zfill, args=(6,))\ - .apply(lambda code : \ - code + '.SH' if code[0] == '6' \ - else code + '.SZ') - - - @staticmethod - def make_date(series): - # 特别是对于分红表,如果某些关键日期还未公布,则会填充0,导致日期解析失败 - series.loc[series == 0] = np.nan - return pd.to_datetime( - series.astype(str), format='%Y%m%d') - - - @staticmethod - def make_nparray(series): - return series.apply(lambda x : np.array(x)) - - - @staticmethod - def make_time(series): - s_hr = series // 10000000 * 3600000 - s_min = series % 10000000 // 100000 * 60000 - s_sec = series % 100000 // 1000 - s_ms = series % 1000 - return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms') - - - -class DDBPITLoader(DDBLoader): - - ddb_path = "dfs://pit_stock_ts" - ddb_dbname = "ddb_pit_stock_ts" - - num_code_partition = 50 - - table_name_mapping = { - #'CBS_AFTER_ADJ' : 'bs_common_adj', - #'CBS_BEFORE_ADJ' : 'bs_common_ori', - #'CCFS_AFTER_ADJ' : 'cfs_common_adj', - #'CCFS_BEFORE_ADJ' : 'cfs_common_ori', - #'CIS_AFTER_ADJ' : 'is_common_adj', - #'CIS_BEFORE_ADJ' : 'is_common_ori', - 'DIV_WIND' : 'divident', - #'EP_WIND' : 'earnings_preannouncement', - #'PEE_WIND' : 'preliminary_earnings_estimate' - } - - meta_col_config = { - 'WIND_CODE' : ('code', 'SYMBOL'), - # mssql表中不需要记录的meta字段,在这里直接设置为None - 'IntCode' : None, - 'ACTUAL_ANN_DT' : None, - 'ReportPeriod' : ('report_period', 'DATE'), - 'AppearInPeriod' : ('appear_in_period', 'DATE'), - 'AppearAtDate' : ('appear_at_date', 'DATE') - } - - date_col_set = { - 'report_period', - 'appear_in_period', - 'appear_at_date', - 'ReportPeriod', - 'AppearInPeriod', - 'AppearAtDate', - 'EQY_RECORD_DT', - 'EX_DT', - 'DVD_PAYOUT_DT', - 'S_DIV_PRELANDATE', - 'S_DIV_SMTGDATE', - 'DVD_ANN_DT', - 'S_DIV_PREANNDT' - } - - ddb_type_mapping = { - 'float' : 'DOUBLE', - 'int' : 'INT', - 'text' : 'STRING', - 'varchar' : 'STRING', - 'str' : 'STRING' - } - - # 基本面数据库现在存放在91服务器之上 - mssql_config = { - 'host' : '192.168.1.91', - 'username' : 'sa', - 'password' : 'xn.123', - 'dbname' : 'tr_statement' - } - - - def __init__(self): - super().__init__() - # 重新设定mssql_engine对象,此时我们需要使用基本面数据库 - self.mssql_engine = sa.create_engine( - "mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config), - connect_args = { - "TrustServerCertificate": "yes" - }, echo=False - ) - - - def create_ddb_database(self): - self.ddb_sess.run(""" - {dbName} = database( - directory = '{dbPath}', - partitionType = HASH, - partitionScheme = [SYMBOL, {num_code_partition}], - engine = 'TSDB' - ) - """.format( - dbName = self.ddb_dbname, - dbPath = self.ddb_path, - num_code_partition = self.num_code_partition - )) - - - def _make_col_config(self, mssql_table_name): - """ - Return: - mssql_col_name_list, ddb_col_name_list, ddb_col_type_list - """ - with self.mssql_engine.connect() as conn: - col_sp_list = list(conn.execute(f"exec sp_columns {mssql_table_name}").fetchall()) - - mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \ - [], [], [] - - for col_sp in col_sp_list: - _col_name = col_sp[3] - _col_type = col_sp[5] - - # 对于meta字段,需要根据meta配置表来进行处理 - if _col_name in self.meta_col_config: - # 跳过mssql表中 不需要记录的meta字段 - if self.meta_col_config[_col_name] is None: - continue - # 字段名和字段类型都要进行映射 - mssql_col_name_list.append(_col_name) - ddb_col_name_list.append(self.meta_col_config[_col_name][0]) - ddb_col_type_list.append(self.meta_col_config[_col_name][1]) - # 对于非meta字段,仅需要检查是否是float类型,对于float类型设置类型为DOUBLE - else: - # 需要之后被转换成DATE的字段,一般在原表中为为INT类型 - if _col_name in self.date_col_set: - mssql_col_name_list.append(_col_name) - ddb_col_name_list.append(_col_name) - ddb_col_type_list.append('DATE') - # 按照对照表进行类型转换 - elif _col_type in self.ddb_type_mapping: - mssql_col_name_list.append(_col_name) - ddb_col_name_list.append(_col_name) - ddb_col_type_list.append(self.ddb_type_mapping[_col_type]) - # 对照表中没有的字段类型,就不加入到字段列表中了 - else: - print(f"!**Unrecognized type '{_col_type}' for column {_col_name}, will skip.") - - return mssql_col_name_list, ddb_col_name_list, ddb_col_type_list - - - def create_ddb_partition_table(self, mssql_table_name): - """创建分区表""" - memory_table_name = mssql_table_name - partition_table_name = self.table_name_mapping[mssql_table_name] - - mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \ - self._make_col_config(mssql_table_name) - - # 根据是否 - if 'appear_in_period' in ddb_col_name_list: - compress_methods = """{ - 'report_period' : 'delta', - 'appear_in_period' : 'delta', - 'appear_at_date' : 'delta' - }""" - else: - compress_methods = """{ - 'report_period' : 'delta', - 'appear_at_date' : 'delta' - }""" - - # 因为已经根据`appear_in_period`分列了调整前和调整后,因此不需要对它再进行排序了 - sort_columns = "`code`report_period`appear_at_date" - - # 1. 先创建内存表,内存表中设定好列名和列类型 - # 2. 然后根据内存表创建分区表,设定分区列等信息 - self.ddb_sess.run(""" - {memory_table_name} = table( - {capacity}:0, - {column_name_list}, - [{column_type_list}] - ); - - if (existsTable("{ddb_path}", "{partition_table_name}")) {{ - dropTable({ddb_dbname}, "{partition_table_name}"); - }} - - {partition_table_name} = createPartitionedTable( - dbHandle = {ddb_dbname}, - table = {memory_table_name}, - tableName = "{partition_table_name}", - partitionColumns = 'code', - compressMethods = {compress_methods}, - sortColumns = {sort_columns} - ); - """.format( - ddb_dbname = self.ddb_dbname, - ddb_path = self.ddb_path, - memory_table_name = memory_table_name, - partition_table_name = partition_table_name, - capacity = 10, - column_name_list = '`' + '`'.join(ddb_col_name_list), - column_type_list = ','.join(ddb_col_type_list), - compress_methods = compress_methods.replace('\n', '').replace(' ', ''), - sort_columns = sort_columns - )) - print('-' * 80) - print(f"Did create parition table <{partition_table_name}>:") - pprint(self.ddb_sess.run(f"schema({partition_table_name});")) - return partition_table_name, mssql_col_name_list - - - def create_ddb_partition_tables(self): - for mssql_table_name in self.table_name_mapping: - self.create_ddb_partition_table(mssql_table_name) - - - def _dump_pit_to_ddb(self, mssql_table_name): - print('Will work on table', mssql_table_name) - # 返回的`mssql_col_name_list`可以用来对SQL-Server获取的dataframe进行列过滤 - partition_table_name, mssql_col_name_list = \ - self.create_ddb_partition_table(mssql_table_name) - - with self.mssql_engine.connect() as conn: - stat = f"select distinct [WIND_CODE] from {mssql_table_name}" - stock_id_list = list(conn.execute(stat).fetchall()) - - with tqdm(stock_id_list) as pbar: - for (stock_id,) in pbar: - pbar.set_description(f"Will work on {stock_id}") - #pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server") - stat = """ - select * from {mssql_table_name} - where WIND_CODE='{stock_id}' and AppearAtDate>0 - """.format( - mssql_table_name = mssql_table_name, - stock_id = stock_id - ) - row_list = list(conn.execute(stat).fetchall()) - num_rows = len(row_list) - - # 因为对AppearAtDate做了过滤,所以有可能得到一个空的数据集 - if num_rows == 0: - print(f"Will skip {stock_id} due to empty result.") - continue - - #pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}") - # 这里需要对select语句获取的所有列进行一次过滤,以保证和partition table中的列一致 - df = pd.DataFrame(row_list)[mssql_col_name_list] - # 需要把部分字段的int字段类型转换成DATE字段类型 - for df_col in df.columns: - if df_col in self.date_col_set: - df[df_col] = DDBLoader.make_date(df[df_col]) - # 因为在做数据库View的时候已经做过一轮转换了,所以这里就不需要再次转换了 - #df['WIND_CODE'] = DDBLoader.tscode_to_windcode(df['WIND_CODE']) - - self.ddb_sess.upload({mssql_table_name : df}) - self.ddb_sess.run(f"{partition_table_name}.tableInsert({mssql_table_name})") - - - def dump_pit_to_ddb(self): - for mssql_table_name in self.table_name_mapping: - self._dump_pit_to_ddb(mssql_table_name) - - -class DDBDailyLoader(DDBLoader): - - ddb_path = "dfs://daily_stock_ts" - ddb_dbname = "db_daily_stock_ts" - - daily_kline_cols = [ - 'code', 'm_nDate', - 'open', 'high', 'low', 'close', 'vol', - 'amount', 'cjbs', 'yclose', - 'PctChg', 'IsZt', 'IsDt', 'IsST', 'IsGoDelist', - 'FloatShares', 'MarketValues', - 'factor' - ] - - daily_kline_col_types = [ - 'SYMBOL', 'DATE', - 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', - 'DOUBLE', 'INT', 'DOUBLE', - 'DOUBLE', 'INT', 'INT', 'INT', 'INT', - 'DOUBLE', 'DOUBLE', - 'DOUBLE' - ] - - - def create_ddb_database(self): - # TODO: daily数据库已经在DDBDailyFactor中被创建了 - # 后续可以迁移过来,不过现在就暂时先不管了 - pass - - - def load_ddb_database(self): - self.ddb_sess.run(""" - {dbName} = database(directory='{dbPath}') - """.format( - dbName = self.ddb_dbname, - dbPath = self.ddb_path - )) - print('Did load database from', self.ddb_path) - - - def create_ddb_partition_table(self, memory_table_name, partition_table_name): - # TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来 - - # 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可 - # 搬迁数据的时候需要考虑按照逐个股票来搬迁,以免造成对内存的巨大压力 - self.ddb_sess.run(""" - // 确保删除原表 - if (existsTable("{ddb_daily_path}", "{partition_table_name}")) {{ - dropTable({ddb_daily_dbname}, "{partition_table_name}"); - }} - - // 然后根据内存表的结构,创建持久化的分区表 - {partition_table_name} = {ddb_daily_dbname}.createPartitionedTable( - table = {memory_table_name}, - tableName = "{partition_table_name}", - partitionColumns = `code, - sortColumns = `code`m_nDate, - compressMethods = {{m_nDate:"delta"}} - ); - """.format( - ddb_daily_path = self.ddb_path, - ddb_daily_dbname = self.ddb_dbname, - memory_table_name = memory_table_name, - partition_table_name = partition_table_name, - )) - - - def create_ddb_memory_table(self, memory_table_name, capacity): - self.ddb_sess.run(""" - // 先创建一个空的内存表用来表征结构,如果无需插入数据,capacity可以设为10 - {memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]); - """.format( - memory_table_name = memory_table_name, - capacity = capacity, - col_names = '`' + '`'.join(self.daily_kline_cols), - col_types = ', '.join(self.daily_kline_col_types) - )) - - - def dump_daily_kline_to_ddb(self): - # 先创建一个分区表,然后再逐个股票的数据插入 - # 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦 - # 2. python程序中的dataframe直接上传到dolphindb内存表,不需要考虑内存表字段类型,分区表中设置好即可 - - memory_table_name = 'daily_kline_mem' - partition_table_name = 'daily_kline' - - self.create_ddb_memory_table(memory_table_name, 10) - print('Did create ddb memory table.') - pprint(self.ddb_sess.run(f"schema({memory_table_name})")) - self.create_ddb_partition_table(memory_table_name, partition_table_name) - print('Did create ddb partition table.') - pprint(self.ddb_sess.run(f"schema({partition_table_name})")) - - with self.mssql_engine.connect() as conn: - stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]" - stock_id_list = list(conn.execute(stat).fetchall()) - - with tqdm(stock_id_list) as pbar: - for (stock_id,) in pbar: - pbar.set_description(f"Will work on {stock_id}") - #pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server") - stat = """ - select * from [StockDaily].dbo.[DailyKLine] - where StockID='{stock_id}' - """.format( - stock_id = stock_id - ) - row_list = list(conn.execute(stat).fetchall()) - num_rows = len(row_list) - - #pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}") - df = pd.DataFrame(row_list) - df['date'] = DDBLoader.make_date(df['date']) - df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID']) - self.ddb_sess.upload({memory_table_name : df}) - #print('Did upload dataframe to ddb.') - #pprint(self.ddb_sess.run(f"schema({memory_table_name})")) - #break - self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})") - - - -class DDBHFTLoader(DDBLoader): - """ - 0. 从sql-server中读取calendar数据,并创建成员变量df_calendar,df_calendar可以保存在本地pickle作为缓存 - |- `def make_calendar_df(self) -> df_calendar` - - 1. 创建ddb中的数据库,分区性质从calendar数据中获取 - |- `def create_ddb_database(self, df_calendar) -> void` - |- `def load_ddb_database(self) -> void` - - 2. 在ddb数据库中创建calendar表 - |- `def create_ddb_calendar(self, df_calendar) -> void` - - 3. 创建ddb的分布式表结构 - |- `create_ddb_partition_table(self, hft_type_name)` - |- `_make_table_skeleton(self, hft_type_name, capacity) -> memory_table_name` - - 4. 从sql server的高频数据转录到dolpindb数据库中 - |- `dump_hft_to_ddb(self, type_name, stock_id, trade_date=None)` - """ - - hft_type_list = ['KLine', 'Order', 'Tick', 'TickQueue', 'Transe'] - - protobuff_name_dict = { - name : f"{name}Message_pb2" for name in hft_type_list - } - - protobuff_module_dict = { - type_name : importlib.import_module(f".{module_name}", package='ProtoBuffEntitys') - for type_name, module_name in protobuff_name_dict.items() - } - - protobuff_desc_dict = { - type_name : eval(f"ProtoBuffEntitys.{module_name}.{type_name}Array.{type_name}Data.DESCRIPTOR") - for type_name, module_name in protobuff_name_dict.items() - } - - mssql_name_dict = { - type_name : ( - f"{type_name}" if type_name != 'TickQueue' \ - else f"TickQue" - ) for type_name in hft_type_list - } - - # 数据库路径和数据库名可以不一致 - ddb_path = "dfs://hft_stock_ts" - ddb_dbname = "db_stock_ts" - ddb_memory_table_suffix = "Memroy" - ddb_partition_table_suffix = "Partitioned" - - # calendar表不需要分区,因此需要创建一个新的数据库 - # 该数据库可以是一个简单的csv,现在还不清楚两者的差别 - #ddb_calendar_path = "dfs://daily_calendar" - #ddb_calendar_dbname = "db_calendar" - ddb_calendar_table_name = "Calendar" - - col_type_mapping = { - 'code' : 'SYMBOL', - 'm_nDate' : 'DATE', - 'm_nTime' : 'TIME', - 1 : 'FLOAT', - 3 : 'INT', - 5 : 'INT', - 13 : 'INT', - } - - # this value may be used by factor makers, which may loop through code partitions - num_code_partition = 50 - - num_workers = 8 - default_table_capacity = 10000 - ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv' - - - def init_ddb_database(self, df_calendar): - """ - 1. 创建ddb_database - 2. 创建calendar表 - 3. 创建数据分区表 - """ - # df_calendar还是由外部输入比较方便 - #df_calendar = self.make_calendar_df() - self.create_ddb_database(df_calendar) - self.create_ddb_calendar(df_calendar) - for hft_type_name in self.hft_type_list: - self.create_ddb_partition_table(hft_type_name) - - - def init_ddb_table_data(self, df_calendar, num_workers=None): - """ - 对每个股票进行循环,转录数据到分区表 - """ - stock_list = df_calendar['code'].unique().astype('str') - - # 不能重复创建Pool对象,因此需要在循环的最外侧创建好Pool对象,然后传参进去 - with Pool(self.num_workers if num_workers is None else num_workers) as pool: - for hft_type_name in self.hft_type_list: - print('Will work on hft type:', hft_type_name) - with tqdm(stock_list) as pbar: - for stock_id in pbar: - pbar.set_description(f"Working on stock {stock_id}") - self.dump_hft_to_ddb(hft_type_name, stock_id, pbar=pbar, pool=pool) - - - def _get_stock_date_list(self, cache=False): - """ - Deprecated: This function is replaced by `create_ddb_calendar()`. - """ - if cache: - with open('tmp.pkl', 'rb') as fin: - stock_list, date_list = pickle.load(fin) - else: - with self.mssql_engine.connect() as conn: - # 从KLine表查询,主要是因为KLine表最小 - stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine" - rs = conn.execute(stat) - stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()] - stock_list, date_list = zip(*stock_date_list) - - # cache - #with open('tmp.pkl', 'wb') as fout: - # pickle.dump((stock_list, date_list), fout) - - return pd.Series(stock_list, dtype='str').unique(), \ - pd.Series(date_list, dtype='datetime64[D]').unique() - - - def create_ddb_database(self, pd_calendar): - # 从`pd_calendar`中创建`stock_list`和`date_list` - stock_list = pd_calendar['code'].unique().astype('str') - date_list = pd_calendar['m_nDate'].unique().astype('datetime64[D]') - - # 可以把所有股票高频数据放在一个数据库中不同的表 - # 分区策略是跟数据库绑定的,因此需要保证同一个数据库中的表都使用同样的分区额策略 - # 对于股票高频数据,我们可以使用COMPO的分区策略,并且两个子db的分区策略都是VALUE类型的code和m_nDate字段 - if self.ddb_sess.existsDatabase(self.ddb_path): - print('Wiil drop database:', self.ddb_path) - self.ddb_sess.dropDatabase(self.ddb_path) - - # 要创建一个COMPO分区的数据库,需要首先创建两个简单分区的子数据库 - # 这里我们使用先按日期,然后按股票分区的子数据库 - # Please note that when creating a DFS database with COMPO domain, - # the parameter dbPath for each partition level must be either an empty string or unspecified. - db_date = self.ddb_sess.database('db_date', partitionType=keys.VALUE, partitions=date_list, dbPath='') - - # 这里看起来直接使用dolphindb的脚本语句更方便一些 - self.ddb_sess.run(""" - db_stock = database("", 5, [SYMBOL, {num_code_partition}]) - """.format( - num_code_partition = self.num_code_parition - )) - #self.ddb_sess.run(""" - # db_stock = database("", 1, symbol({partitions})) - #""".format( - # partitions = '`' + '`'.join(stock_list) - #)) - - self.ddb_sess.run(""" - {dbName} = database( - directory = '{dbPath}', - partitionType = COMPO, - partitionScheme = [db_date, db_stock], - engine = "TSDB") - """.format( - dbName = self.ddb_dbname, - dbPath = self.ddb_path - )) - - self._load_ddb_dump_journal(recreate=True) - - - def load_ddb_database(self): - db_date = self.ddb_sess.database('db_date', dbPath='') - db_stock = self.ddb_sess.database('db_stock', dbPath='') - - self.ddb_sess.run("{dbName} = database(directory='{dbPath}')".format( - dbName = self.ddb_dbname, - dbPath = self.ddb_path - )) - - self._load_ddb_dump_journal() - - - def _load_ddb_dump_journal(self, recreate=False): - if recreate or not Path(self.ddb_dump_journal_fname).exists(): - print('Will create new dump journal.') - self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'w') - self.dump_journal_writer.write("type_name,stock_id,status\n") - self.dump_journal_writer.flush() - else: - print('Will load previous dump journal.') - self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'a') - - self.dump_journal_df = pd.read_csv(self.ddb_dump_journal_fname) - self.dump_journal_df.set_index(['type_name', 'stock_id', 'status'], inplace=True) - # 因为dump_journal_df只会在创建的时候载入一次数据,之后不会在写入,因此可以在此时对index进行排序 - self.dump_journal_df.sort_index(inplace=True) - print('Did load the dump journal, shape', self.dump_journal_df.shape) - #pprint(self.dump_journal_df.head()) - - - def create_ddb_calendar(self, df_calendar): - mem_table = self.ddb_calendar_table_name + self.ddb_memory_table_suffix - per_table = self.ddb_calendar_table_name - # 1. 创建临时内存表 - # calendar的行数大概是股票数量 * 交易日数量 - self.ddb_sess.run(""" - {table_name} = table({capacity}:0, {col_names}, [{col_types}]); - """.format( - table_name = mem_table, - capacity = 5000 * 1000, - col_names = '`code`m_nDate', - col_types = "SYMBOL, DATE" - )) - print('Did create the memory table') - - # 2. 向内存表中插入所有(code, date)数据 - appender = ddb.tableAppender(tableName=mem_table, ddbSession=self.ddb_sess) - num = appender.append(df_calendar) - print('Did append calendar data into ddb memory table, return code', num) - - # 3. 创建持久化表格之前需要先根据路径创建一个database对象 - # 但研究了一下,发现好像一个database里面可以同时存在分区表和非分区表, - # 所以在这里暂时就不创建新的database了 - # 但因为原database设置成了TSDB,所以必须在createTable的时候指定sortKey - #self.ddb_sess.run(""" - # {db_name} = - #""") - - # 4. 直接从内存表创建一个持久化表格 - if self.ddb_sess.existsTable(self.ddb_path, per_table): - self.ddb_sess.dropTable(self.ddb_path, per_table) - self.ddb_sess.run(""" - tableInsert(createTable( - dbHandle={ddb_dbname}, - table={mem_table}, - tableName=`{per_table}, - sortColumns=`code`m_nDate, - compressMethods={{"m_nDate":"delta"}} - ), {mem_table}) - """.format( - ddb_dbname = self.ddb_dbname, - mem_table = mem_table, - per_table = per_table - )) - print('Did create the persistent table with the memory table') - - - def make_calendar_df(self): - print('Will create calendar dataframe from SQL Server') - # 从KLine表查询,主要是因为KLine表最小 - with self.mssql_engine.connect() as conn: - stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine" - rs = conn.execute(stat) - stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()] - - df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate']) - df_calendar['m_nDate'] = make_date(df_calendar['m_nDate']) - print('Did make the DataFrame for calendar') - return df_calendar - - - def _make_table_skeleton(self, hft_type_name, table_capacity=default_table_capacity): - - def _make_tbl_config(field_list): - """ - 根据ProtoBuffEntity对象的Descriptor.fields,创建ddb标准的列名列表和列类型列表。 - """ - col_name_list, col_type_list = [], [] - for desc in field_list: - col_name_list.append(desc.name) - # 如果对列明有特殊设定,目前仅包括`code`m_nDate和`m_nTime三个字段 - if desc.name in self.col_type_mapping: - col_type_list.append(self.col_type_mapping[desc.name]) - # 通过对ProtoBuffEntity的类型编号,映射到ddb的类型编号 - # 如果默认值是一个数组,那么ddb类型要额外增加说明是数组 - # ProtoBuffEntity的类型编号只针对基本类型,数组需要通过`default_value`来判断 - else: - col_type = self.col_type_mapping[desc.type] - if isinstance(desc.default_value, list): - col_type += '[]' - col_type_list.append(col_type) - return col_name_list, col_type_list - - desc_obj = self.protobuff_desc_dict[hft_type_name] - col_name_list, col_type_list = _make_tbl_config(desc_obj.fields) - - table_name = hft_type_name + self.ddb_memory_table_suffix - print('-' * 80) - print('Will create table structure:', table_name) - - self.ddb_sess.run(""" - {table_name} = table({capacity}:0, {col_names}, [{col_types}]); - """.format( - table_name = table_name, - capacity = table_capacity, - col_names = '`' + '`'.join(col_name_list), - col_types = ', '.join([f"'{type_name}'" for type_name in col_type_list]) - )) - res = self.ddb_sess.run(f"schema({table_name}).colDefs") - pprint(res) - print('-' * 80) - return table_name - - - def create_ddb_partition_table(self, hft_type_name): - memory_table_name = self._make_table_skeleton(hft_type_name, 10) - partition_table_name = hft_type_name + self.ddb_partition_table_suffix - - print('-' * 80) - print('Will create partitioned table:', partition_table_name) - - self.ddb_sess.run(""" - {ddb_dbname}.createPartitionedTable( - table = {memory_table_name}, - tableName = `{partition_table_name}, - partitionColumns = `m_nDate`code, - sortColumns = `code`m_nDate`m_nTime, - compressMethods = {{m_nDate:"delta", m_nTime:"delta"}} - ) - """.format( - ddb_dbname = self.ddb_dbname, - memory_table_name = memory_table_name, - partition_table_name = partition_table_name - )) - - res = self.ddb_sess.run(f"schema(loadTable('{self.ddb_path}', '{partition_table_name}')).colDefs") - pprint(res) - print('-' * 80) - - - def dump_hft_to_ddb(self, type_name, stock_id, trade_date=None, pbar=None, pool=None): - if (type_name, stock_id, 'OK') in self.dump_journal_df.index: - message = f"Wiil skip ({type_name}, {stock_id}) as it appears in the dump journal." - if pbar is None: - print(message) - else: - pbar.set_description(message) - return - - self.dump_journal_writer.write(f"{type_name},{stock_id},START\n") - self.dump_journal_writer.flush() - - # 经过尝试,按个股来做batch查询效率还是可以接受的 - # mssql中,索引字段是(S_INFO_WINDCODE, TRADE_DT) - with self.mssql_engine.connect() as conn: - stat = """ - select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}] - where S_INFO_WINDCODE='{stock_id}' - """.format( - mssql_type_name = self.mssql_name_dict[type_name], - stock_id = stock_id - ) - row_list = list(conn.execute(stat).fetchall()) - num_rows = len(row_list) - - if pbar: - #pbar.set_description(f"Did get the result set for stock {stock_id} from mssql") - pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}") - else: - print(f"Did get the result set for stock {stock_id} from mssql") - - # 每一行是当个个股某一日的所有高频交易信息 - # 使用多进程来加快速度 - - #with Pool(self.num_workers if num_workers is None else num_workers) as pool: - if pool is None: - print("Will create new Pool object, but this is not encourage for large batch work.") - pool = Pool(self.num_worker) - - # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据 - with tqdm(total=num_rows, leave=False) as sub_pbar: - for _ in pool.imap_unordered( - functools.partial( - dump_stock_daily_to_ddb, - type_name = type_name, - stock_id = stock_id - ), - row_list - ): - sub_pbar.update() - - self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n") - self.dump_journal_writer.flush() - - - @staticmethod - def make_stock_daily_df(blob, type_name, stock_id): - """ - 用于做多进程录入ddb的函数 - """ - blob = gzip.decompress(blob) - dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()") - dataArray.ParseFromString(blob) - - data_dict_list = [ - {field.name : val for field, val in entry.ListFields()} - for entry in dataArray.dataArray - ] - - array_type_list = [ - field.name - for field, val in dataArray.dataArray[0].ListFields() - if isinstance(field.default_value, list) - ] - #pprint(array_type_list) - - df = pd.DataFrame(data_dict_list) - #df['code'] = make_symbol(df['code']) - df['code'] = stock_id - df['m_nDate'] = make_date(df['m_nDate']) - df['m_nTime'] = df['m_nDate'] + make_time(df['m_nTime']) - for field_name in array_type_list: - df[field_name] = make_nparray(df[field_name]) - - #print(f"Did create ddb table for dataframe of shape {df.shape}") - # self.make_table_skeleton(type_name, df.shape[0]) - return df - - - @staticmethod - def dump_stock_daily_to_ddb(row, type_name, stock_id): - """ - 用于做多进程录入ddb的函数 - """ - df_table_name = type_name - df = make_stock_daily_df(row[2], type_name, stock_id) - - ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848) - ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password']) - - ddb_sess.upload({df_table_name : df}) - # 因为在做Tick数据的时候,偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试 - ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( - #ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( - dbPath = DDBLoader.ddb_path, - partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix, - df_table_name = df_table_name - )) - - - -def main(): - - # TODO: - # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 - - # PIT基本面数据 - loader = DDBPITLoader() - loader.create_ddb_database() - #loader.create_ddb_partition_tables() - loader.dump_pit_to_ddb() - - # 日频行情数据 - #loader = DDBDailyLoader() - #loader.load_ddb_database() - #loader.dump_daily_kline_to_ddb() - - - # 高频数据 - #df_calendar = loader.make_calendar_df() - - #loader.init_ddb_database(df_calendar) - #print('Did finish init_ddb_database') - - #loader.load_ddb_database() - #print('Did load ddb database') - - #loader.init_ddb_table_data(df_calendar) - #print('Did finish init_table_data') - - -if __name__ == '__main__': - main() - diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..9a4d8ab --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,3 @@ +from DDBBase import DDBBase + +__all__ = ['DDBBase'] diff --git a/src/DDBExpression.py b/src/expr/DDBExpression.py similarity index 100% rename from src/DDBExpression.py rename to src/expr/DDBExpression.py diff --git a/src/DDBFactor.py b/src/factor/DDBFactor.py similarity index 100% rename from src/DDBFactor.py rename to src/factor/DDBFactor.py diff --git a/src/loader/DDBBasicInfoLoader.py b/src/loader/DDBBasicInfoLoader.py new file mode 100644 index 0000000..94db59d --- /dev/null +++ b/src/loader/DDBBasicInfoLoader.py @@ -0,0 +1,159 @@ +from pprint import pprint +from pathlib import Path +from tqdm import tqdm +#from tqdm.contrib.concurrent import process_map + +import numpy as np +import pandas as pd + +import dolphindb as ddb +import dolphindb.settings as keys + +import sqlalchemy as sa + +from .DDBLoader import DDBLoader + + +class DDBBasicInfoLoader(DDBLoader): + + ddb_path = "dfs://info_stock_ts" + ddb_dbname = "ddb_info_stock_ts" + + + # 这些映射表似乎没有什么用,每张表手动做一下表结构和插入动作就可以了 + + table_name_mapping = { + 'IndustryInfo' : 'info_industry', + 'ListAndDelist' : 'info_list_delist' + } + + date_col_set = { + 'EnterDate', + 'ExitDate' + 'ListDate', + 'DelistDate' + } + + ddb_type_mapping = { + 'float' : 'DOUBLE', + 'int' : 'INT', + 'text' : 'STRING', + 'varchar' : 'STRING', + 'str' : 'STRING' + } + + # 基本面数据库现在存放在91服务器之上 + mssql_config = { + 'host' : '192.168.1.7', + 'username' : 'sa', + 'password' : 'passw0rd!', + 'dbname' : 'StockInfo' + } + + + def __init__(self): + super().__init__() + # 重新设定mssql_engine对象,此时我们需要使用基本面数据库 + self.mssql_engine = sa.create_engine( + "mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config), + connect_args = { + "TrustServerCertificate": "yes" + }, echo=False + ) + + + + def create_ddb_database(self): + self.ddb_sess.run(""" + {dbName} = database( + directory = '{dbPath}', + partitionType = HASH, + partitionScheme = [SYMBOL, 10], + engine = 'TSDB' + ) + """.format( + dbName = self.ddb_dbname, + dbPath = self.ddb_path, + )) + + + def _create_info_industry_table(self): + with self.mssql_engine.connect() as conn: + stat = "select * from IndustryInfo" + row_list = list(conn.execute(stat).fetchall()) + + capacity = len(row_list) + df = pd.DataFrame(row_list) + df['StockID'] = self.tscode_to_windcode(df['StockID']) + df['EnterDate'] = self.make_date(df['EnterDate']) + df['ExitDate'] = self.make_date(df['ExitDate']) + + self.ddb_sess.run(f""" + tbl = table( + {capacity}:0, + `code`industry_code`industry_name`industry_level`enter_date`exit_date`is_new`industry_class, + [SYMBOL, SYMBOL, SYMBOL, INT, DATE, DATE, INT, SYMBOL] + ); + """) + + self.ddb_sess.run("tableInsert{tbl}", df) + self.ddb_sess.run(f""" + dropTable({self.ddb_dbname}, "info_industry"); + + info_industry = createPartitionedTable( + dbHandle = {self.ddb_dbname}, + table = tbl, + tableName = "info_industry", + partitionColumns='code', + compressMethods = {{'enter_date' : 'delta', 'exit_date' : 'delta'}}, + sortColumns = ['code', 'enter_date', 'exit_date'] + ); + + info_industry.tableInsert(tbl); + """) + + + def _create_info_list_delist_table(self): + with self.mssql_engine.connect() as conn: + stat = "select * from ListAndDelist" + row_list = list(conn.execute(stat).fetchall()) + + capacity = len(row_list) + df = pd.DataFrame(row_list) + df['StockID'] = self.tscode_to_windcode(df['StockID']) + df['ListDate'] = self.make_date(df['ListDate']) + df['DelistDate'] = self.make_date(df['DelistDate']) + + self.ddb_sess.run(f""" + tbl = table( + {capacity}:0, + `code`list_date`delist_date, + [SYMBOL, DATE, DATE] + ); + """) + + self.ddb_sess.run("tableInsert{tbl}", df) + self.ddb_sess.run(f""" + dropTable({self.ddb_dbname}, "info_list_delist"); + + info_list_delist = createPartitionedTable( + dbHandle = {self.ddb_dbname}, + table = tbl, + tableName = "info_list_delist", + partitionColumns='code', + compressMethods = {{'list_date' : 'delta', 'delist_date' : 'delta'}}, + sortColumns = ['code', 'list_date', 'delist_date'] + ); + + info_list_delist.tableInsert(tbl); + """) + + + def create_ddb_partition_table(self): + """创建分区表""" + self._create_info_industry_table() + self._create_info_list_delist_table() + + + + diff --git a/src/loader/DDBDailyLoader.py b/src/loader/DDBDailyLoader.py new file mode 100644 index 0000000..abdaa8c --- /dev/null +++ b/src/loader/DDBDailyLoader.py @@ -0,0 +1,160 @@ +import pickle +import functools +import abc +import warnings + +from pprint import pprint +from pathlib import Path +from tqdm import tqdm +#from tqdm.contrib.concurrent import process_map +from multiprocessing import Pool + +import numpy as np +import pandas as pd +from pandas.core.common import SettingWithCopyWarning +warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) + +import dolphindb as ddb +import dolphindb.settings as keys + +import sqlalchemy as sa + +import ProtoBuffEntitys + + +class DDBDailyLoader(DDBLoader): + + ddb_path = "dfs://daily_stock_ts" + ddb_dbname = "db_daily_stock_ts" + + daily_kline_cols = [ + 'code', 'm_nDate', + 'open', 'high', 'low', 'close', 'vol', + 'amount', 'cjbs', 'yclose', + 'PctChg', 'IsZt', 'IsDt', 'IsST', 'IsGoDelist', + 'FloatShares', 'MarketValues', + 'factor' + ] + + daily_kline_col_types = [ + 'SYMBOL', 'DATE', + 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', + 'DOUBLE', 'INT', 'DOUBLE', + 'DOUBLE', 'INT', 'INT', 'INT', 'INT', + 'DOUBLE', 'DOUBLE', + 'DOUBLE' + ] + + + def create_ddb_database(self): + # TODO: daily数据库已经在DDBDailyFactor中被创建了 + # 后续可以迁移过来,不过现在就暂时先不管了 + pass + + + def load_ddb_database(self): + self.ddb_sess.run(""" + {dbName} = database(directory='{dbPath}') + """.format( + dbName = self.ddb_dbname, + dbPath = self.ddb_path + )) + print('Did load database from', self.ddb_path) + + + def create_ddb_partition_table(self, memory_table_name, partition_table_name): + # TODO: 现在只做一个日频行情数据表,今后可能考虑把基本面数据也迁移过来 + + # 由于日频行情数据的表结构相对简单,所以直接把表结构写在这里代码里即可 + # 搬迁数据的时候需要考虑按照逐个股票来搬迁,以免造成对内存的巨大压力 + self.ddb_sess.run(""" + // 确保删除原表 + if (existsTable("{ddb_daily_path}", "{partition_table_name}")) {{ + dropTable({ddb_daily_dbname}, "{partition_table_name}"); + }} + + // 然后根据内存表的结构,创建持久化的分区表 + {partition_table_name} = {ddb_daily_dbname}.createPartitionedTable( + table = {memory_table_name}, + tableName = "{partition_table_name}", + partitionColumns = `code, + sortColumns = `code`m_nDate, + compressMethods = {{m_nDate:"delta"}} + ); + """.format( + ddb_daily_path = self.ddb_path, + ddb_daily_dbname = self.ddb_dbname, + memory_table_name = memory_table_name, + partition_table_name = partition_table_name, + )) + + + def create_ddb_memory_table(self, memory_table_name, capacity): + self.ddb_sess.run(""" + // 先创建一个空的内存表用来表征结构,如果无需插入数据,capacity可以设为10 + {memory_table_name} = table({capacity}:0, {col_names}, [{col_types}]); + """.format( + memory_table_name = memory_table_name, + capacity = capacity, + col_names = '`' + '`'.join(self.daily_kline_cols), + col_types = ', '.join(self.daily_kline_col_types) + )) + + + def dump_daily_kline_to_ddb(self): + # 先创建一个分区表,然后再逐个股票的数据插入 + # 1. 需要额外控制在插入第一个股票数据的时候创建分区表比较麻烦 + # 2. python程序中的dataframe直接上传到dolphindb内存表,不需要考虑内存表字段类型,分区表中设置好即可 + + memory_table_name = 'daily_kline_mem' + partition_table_name = 'daily_kline' + + self.create_ddb_memory_table(memory_table_name, 10) + print('Did create ddb memory table.') + pprint(self.ddb_sess.run(f"schema({memory_table_name})")) + self.create_ddb_partition_table(memory_table_name, partition_table_name) + print('Did create ddb partition table.') + pprint(self.ddb_sess.run(f"schema({partition_table_name})")) + + with self.mssql_engine.connect() as conn: + stat = "select distinct [StockID] from [StockDaily].dbo.[DailyKLine]" + stock_id_list = list(conn.execute(stat).fetchall()) + + with tqdm(stock_id_list) as pbar: + for (stock_id,) in pbar: + pbar.set_description(f"Will work on {stock_id}") + #pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server") + stat = """ + select * from [StockDaily].dbo.[DailyKLine] + where StockID='{stock_id}' + """.format( + stock_id = stock_id + ) + row_list = list(conn.execute(stat).fetchall()) + num_rows = len(row_list) + + #pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}") + df = pd.DataFrame(row_list) + df['date'] = DDBLoader.make_date(df['date']) + df['StockID'] = DDBLoader.tscode_to_windcode(df['StockID']) + self.ddb_sess.upload({memory_table_name : df}) + #print('Did upload dataframe to ddb.') + #pprint(self.ddb_sess.run(f"schema({memory_table_name})")) + #break + self.ddb_sess.run(f"{partition_table_name}.tableInsert({memory_table_name})") + + +def main(): + + # TODO: + # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 + + # 日频行情数据 + loader = DDBDailyLoader() + loader.load_ddb_database() + #loader.dump_daily_kline_to_ddb() + + +if __name__ == '__main__': + main() + diff --git a/src/loader/DDBHFTLoader.py b/src/loader/DDBHFTLoader.py new file mode 100644 index 0000000..69b7087 --- /dev/null +++ b/src/loader/DDBHFTLoader.py @@ -0,0 +1,484 @@ +import importlib +import gzip +import pickle +import functools +import abc +import warnings + +from pprint import pprint +from pathlib import Path +from tqdm import tqdm +#from tqdm.contrib.concurrent import process_map +from multiprocessing import Pool + +import sqlalchemy as sa +import numpy as np +import pandas as pd +from pandas.core.common import SettingWithCopyWarning +warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) + +import dolphindb as ddb +import dolphindb.settings as keys + +from .ProtoBuffEntitys import KLineMessage_pb2, OrderMessage_pb2, TickMessage_pb2, TickQueueMessage_pb2, TranseMessage_pb2 +from .DDBLoader import DDBLoader + + +class DDBHFTLoader(DDBLoader): + """ + 0. 从sql-server中读取calendar数据,并创建成员变量df_calendar,df_calendar可以保存在本地pickle作为缓存 + |- `def make_calendar_df(self) -> df_calendar` + + 1. 创建ddb中的数据库,分区性质从calendar数据中获取 + |- `def create_ddb_database(self, df_calendar) -> void` + |- `def load_ddb_database(self) -> void` + + 2. 在ddb数据库中创建calendar表 + |- `def create_ddb_calendar(self, df_calendar) -> void` + + 3. 创建ddb的分布式表结构 + |- `create_ddb_partition_table(self, hft_type_name)` + |- `_make_table_skeleton(self, hft_type_name, capacity) -> memory_table_name` + + 4. 从sql server的高频数据转录到dolpindb数据库中 + |- `dump_hft_to_ddb(self, type_name, stock_id, trade_date=None)` + """ + + hft_type_list = ['KLine', 'Order', 'Tick', 'TickQueue', 'Transe'] + + protobuff_name_dict = { + name : f"{name}Message_pb2" for name in hft_type_list + } + + protobuff_module_dict = { + type_name : importlib.import_module(f".{module_name}", package='loader.ProtoBuffEntitys') + for type_name, module_name in protobuff_name_dict.items() + } + + protobuff_desc_dict = { + type_name : eval(f"{module_name}.{type_name}Array.{type_name}Data.DESCRIPTOR") + for type_name, module_name in protobuff_name_dict.items() + } + + mssql_name_dict = { + type_name : ( + f"{type_name}" if type_name != 'TickQueue' \ + else f"TickQue" + ) for type_name in hft_type_list + } + + # 数据库路径和数据库名可以不一致 + ddb_path = "dfs://hft_stock_ts" + ddb_dbname = "db_stock_ts" + ddb_memory_table_suffix = "Memroy" + ddb_partition_table_suffix = "Partitioned" + + # calendar表不需要分区,因此需要创建一个新的数据库 + # 该数据库可以是一个简单的csv,现在还不清楚两者的差别 + #ddb_calendar_path = "dfs://daily_calendar" + #ddb_calendar_dbname = "db_calendar" + ddb_calendar_table_name = "Calendar" + + col_type_mapping = { + 'code' : 'SYMBOL', + 'm_nDate' : 'DATE', + 'm_nTime' : 'TIME', + 1 : 'FLOAT', + 3 : 'INT', + 5 : 'INT', + 13 : 'INT', + } + + # this value may be used by factor makers, which may loop through code partitions + num_code_partition = 50 + + num_workers = 8 + default_table_capacity = 10000 + ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv' + + + def init_ddb_database(self, df_calendar): + """ + 1. 创建ddb_database + 2. 创建calendar表 + 3. 创建数据分区表 + """ + # df_calendar还是由外部输入比较方便 + #df_calendar = self.make_calendar_df() + self.create_ddb_database(df_calendar) + self.create_ddb_calendar(df_calendar) + for hft_type_name in self.hft_type_list: + self.create_ddb_partition_table(hft_type_name) + + + def init_ddb_table_data(self, df_calendar, num_workers=None): + """ + 对每个股票进行循环,转录数据到分区表 + """ + stock_list = df_calendar['code'].unique().astype('str') + + # 不能重复创建Pool对象,因此需要在循环的最外侧创建好Pool对象,然后传参进去 + with Pool(self.num_workers if num_workers is None else num_workers) as pool: + for hft_type_name in self.hft_type_list: + print('Will work on hft type:', hft_type_name) + with tqdm(stock_list) as pbar: + for stock_id in pbar: + pbar.set_description(f"Working on stock {stock_id}") + self.dump_hft_to_ddb(hft_type_name, stock_id, pbar=pbar, pool=pool) + + + def _get_stock_date_list(self, cache=False): + """ + Deprecated: This function is replaced by `create_ddb_calendar()`. + """ + if cache: + with open('tmp.pkl', 'rb') as fin: + stock_list, date_list = pickle.load(fin) + else: + with self.mssql_engine.connect() as conn: + # 从KLine表查询,主要是因为KLine表最小 + stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine" + rs = conn.execute(stat) + stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()] + stock_list, date_list = zip(*stock_date_list) + + # cache + #with open('tmp.pkl', 'wb') as fout: + # pickle.dump((stock_list, date_list), fout) + + return pd.Series(stock_list, dtype='str').unique(), \ + pd.Series(date_list, dtype='datetime64[D]').unique() + + + def create_ddb_database(self, pd_calendar): + # 从`pd_calendar`中创建`stock_list`和`date_list` + stock_list = pd_calendar['code'].unique().astype('str') + date_list = pd_calendar['m_nDate'].unique().astype('datetime64[D]') + + # 可以把所有股票高频数据放在一个数据库中不同的表 + # 分区策略是跟数据库绑定的,因此需要保证同一个数据库中的表都使用同样的分区额策略 + # 对于股票高频数据,我们可以使用COMPO的分区策略,并且两个子db的分区策略都是VALUE类型的code和m_nDate字段 + if self.ddb_sess.existsDatabase(self.ddb_path): + print('Wiil drop database:', self.ddb_path) + self.ddb_sess.dropDatabase(self.ddb_path) + + # 要创建一个COMPO分区的数据库,需要首先创建两个简单分区的子数据库 + # 这里我们使用先按日期,然后按股票分区的子数据库 + # Please note that when creating a DFS database with COMPO domain, + # the parameter dbPath for each partition level must be either an empty string or unspecified. + db_date = self.ddb_sess.database('db_date', partitionType=keys.VALUE, partitions=date_list, dbPath='') + + # 这里看起来直接使用dolphindb的脚本语句更方便一些 + self.ddb_sess.run(""" + db_stock = database("", 5, [SYMBOL, {num_code_partition}]) + """.format( + num_code_partition = self.num_code_parition + )) + #self.ddb_sess.run(""" + # db_stock = database("", 1, symbol({partitions})) + #""".format( + # partitions = '`' + '`'.join(stock_list) + #)) + + self.ddb_sess.run(""" + {dbName} = database( + directory = '{dbPath}', + partitionType = COMPO, + partitionScheme = [db_date, db_stock], + engine = "TSDB") + """.format( + dbName = self.ddb_dbname, + dbPath = self.ddb_path + )) + + self._load_ddb_dump_journal(recreate=True) + + + def load_ddb_database(self): + db_date = self.ddb_sess.database('db_date', dbPath='') + db_stock = self.ddb_sess.database('db_stock', dbPath='') + + self.ddb_sess.run("{dbName} = database(directory='{dbPath}')".format( + dbName = self.ddb_dbname, + dbPath = self.ddb_path + )) + + self._load_ddb_dump_journal() + + + def _load_ddb_dump_journal(self, recreate=False): + if recreate or not Path(self.ddb_dump_journal_fname).exists(): + print('Will create new dump journal.') + self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'w') + self.dump_journal_writer.write("type_name,stock_id,status\n") + self.dump_journal_writer.flush() + else: + print('Will load previous dump journal.') + self.dump_journal_writer = open(self.ddb_dump_journal_fname, 'a') + + self.dump_journal_df = pd.read_csv(self.ddb_dump_journal_fname) + self.dump_journal_df.set_index(['type_name', 'stock_id', 'status'], inplace=True) + # 因为dump_journal_df只会在创建的时候载入一次数据,之后不会在写入,因此可以在此时对index进行排序 + self.dump_journal_df.sort_index(inplace=True) + print('Did load the dump journal, shape', self.dump_journal_df.shape) + #pprint(self.dump_journal_df.head()) + + + def create_ddb_calendar(self, df_calendar): + mem_table = self.ddb_calendar_table_name + self.ddb_memory_table_suffix + per_table = self.ddb_calendar_table_name + # 1. 创建临时内存表 + # calendar的行数大概是股票数量 * 交易日数量 + self.ddb_sess.run(""" + {table_name} = table({capacity}:0, {col_names}, [{col_types}]); + """.format( + table_name = mem_table, + capacity = 5000 * 1000, + col_names = '`code`m_nDate', + col_types = "SYMBOL, DATE" + )) + print('Did create the memory table') + + # 2. 向内存表中插入所有(code, date)数据 + appender = ddb.tableAppender(tableName=mem_table, ddbSession=self.ddb_sess) + num = appender.append(df_calendar) + print('Did append calendar data into ddb memory table, return code', num) + + # 3. 创建持久化表格之前需要先根据路径创建一个database对象 + # 但研究了一下,发现好像一个database里面可以同时存在分区表和非分区表, + # 所以在这里暂时就不创建新的database了 + # 但因为原database设置成了TSDB,所以必须在createTable的时候指定sortKey + #self.ddb_sess.run(""" + # {db_name} = + #""") + + # 4. 直接从内存表创建一个持久化表格 + if self.ddb_sess.existsTable(self.ddb_path, per_table): + self.ddb_sess.dropTable(self.ddb_path, per_table) + self.ddb_sess.run(""" + tableInsert(createTable( + dbHandle={ddb_dbname}, + table={mem_table}, + tableName=`{per_table}, + sortColumns=`code`m_nDate, + compressMethods={{"m_nDate":"delta"}} + ), {mem_table}) + """.format( + ddb_dbname = self.ddb_dbname, + mem_table = mem_table, + per_table = per_table + )) + print('Did create the persistent table with the memory table') + + + def make_calendar_df(self): + print('Will create calendar dataframe from SQL Server') + # 从KLine表查询,主要是因为KLine表最小 + with self.mssql_engine.connect() as conn: + stat = "select distinct S_INFO_WINDCODE, TRADE_DT from Level2BytesKline.dbo.KLine" + rs = conn.execute(stat) + stock_date_list = [(stock_name, date) for stock_name, date in rs.fetchall()] + + df_calendar = pd.DataFrame(stock_date_list, columns=['code', 'm_nDate']) + df_calendar['m_nDate'] = self.make_date(df_calendar['m_nDate']) + print('Did make the DataFrame for calendar') + return df_calendar + + + def _make_table_skeleton(self, hft_type_name, table_capacity=default_table_capacity): + + def _make_tbl_config(field_list): + """ + 根据ProtoBuffEntity对象的Descriptor.fields,创建ddb标准的列名列表和列类型列表。 + """ + col_name_list, col_type_list = [], [] + for desc in field_list: + col_name_list.append(desc.name) + # 如果对列明有特殊设定,目前仅包括`code`m_nDate和`m_nTime三个字段 + if desc.name in self.col_type_mapping: + col_type_list.append(self.col_type_mapping[desc.name]) + # 通过对ProtoBuffEntity的类型编号,映射到ddb的类型编号 + # 如果默认值是一个数组,那么ddb类型要额外增加说明是数组 + # ProtoBuffEntity的类型编号只针对基本类型,数组需要通过`default_value`来判断 + else: + col_type = self.col_type_mapping[desc.type] + if isinstance(desc.default_value, list): + col_type += '[]' + col_type_list.append(col_type) + return col_name_list, col_type_list + + desc_obj = self.protobuff_desc_dict[hft_type_name] + col_name_list, col_type_list = _make_tbl_config(desc_obj.fields) + + table_name = hft_type_name + self.ddb_memory_table_suffix + print('-' * 80) + print('Will create table structure:', table_name) + + self.ddb_sess.run(""" + {table_name} = table({capacity}:0, {col_names}, [{col_types}]); + """.format( + table_name = table_name, + capacity = table_capacity, + col_names = '`' + '`'.join(col_name_list), + col_types = ', '.join([f"'{type_name}'" for type_name in col_type_list]) + )) + res = self.ddb_sess.run(f"schema({table_name}).colDefs") + pprint(res) + print('-' * 80) + return table_name + + + def create_ddb_partition_table(self, hft_type_name): + memory_table_name = self._make_table_skeleton(hft_type_name, 10) + partition_table_name = hft_type_name + self.ddb_partition_table_suffix + + print('-' * 80) + print('Will create partitioned table:', partition_table_name) + + self.ddb_sess.run(""" + {ddb_dbname}.createPartitionedTable( + table = {memory_table_name}, + tableName = `{partition_table_name}, + partitionColumns = `m_nDate`code, + sortColumns = `code`m_nDate`m_nTime, + compressMethods = {{m_nDate:"delta", m_nTime:"delta"}} + ) + """.format( + ddb_dbname = self.ddb_dbname, + memory_table_name = memory_table_name, + partition_table_name = partition_table_name + )) + + res = self.ddb_sess.run(f"schema(loadTable('{self.ddb_path}', '{partition_table_name}')).colDefs") + pprint(res) + print('-' * 80) + + + def dump_hft_to_ddb(self, type_name, stock_id, trade_date=None, pbar=None, pool=None): + if (type_name, stock_id, 'OK') in self.dump_journal_df.index: + message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal." + if pbar is None: + print(message) + else: + pbar.set_description(message) + return + elif (type_name, stock_id, 'START') in self.dump_journal_df.index: + # 任务已经开始,但是没有完全结束,就需要逐个检查日期,跳过已经录入输入的交易日数据 + # 同时,也不会再往日志文件中写入START记录 + _journal_dt = self.ddb_sess.run(f""" + select m_nDate from tbl where code='{stock_id}' group by m_nDate map; + """).set_index('m_nDate') + else: + _journal_dt = None + self.dump_journal_writer.write(f"{type_name},{stock_id},START\n") + self.dump_journal_writer.flush() + + # 经过尝试,按个股来做batch查询效率还是可以接受的 + # mssql中,索引字段是(S_INFO_WINDCODE, TRADE_DT) + with self.mssql_engine.connect() as conn: + stat = """ + select * from [Level2Bytes{mssql_type_name}].dbo.[{mssql_type_name}] + where S_INFO_WINDCODE='{stock_id}' + """.format( + mssql_type_name = self.mssql_name_dict[type_name], + stock_id = stock_id + ) + row_list = list(conn.execute(stat).fetchall()) + + # 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期 + # 这里只把日期值不再`_journal_dt`的记录放入`row_list` + if _journal_dt is not None: + row_list = [row for row in row_list + if pd.to_datetime(row[1]) not in _journal_dt] + print(f"Resume job for {stock_id}, with {len(row_list)} rows left.") + + num_rows = len(row_list) + # 如果行数为0,则说明是空数据,可以直接返回 + if num_rows == 0: + return + + if pbar: + #pbar.set_description(f"Did get the result set for stock {stock_id} from mssql") + pbar.set_description(f"Will work in paralle on dumping job on {stock_id} of len {num_rows}") + else: + print(f"Did get the result set for stock {stock_id} from mssql") + + # 每一行是当个个股某一日的所有高频交易信息 + # 使用多进程来加快速度 + + #with Pool(self.num_workers if num_workers is None else num_workers) as pool: + if pool is None: + print("Will create new Pool object, but this is not encourage for large batch work.") + pool = Pool(self.num_worker) + + # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据 + with tqdm(total=num_rows, leave=False) as sub_pbar: + for _ in pool.imap_unordered( + functools.partial( + dump_stock_daily_to_ddb, + type_name = type_name, + stock_id = stock_id + ), + row_list + ): + sub_pbar.update() + + self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n") + self.dump_journal_writer.flush() + + + @staticmethod + def make_stock_daily_df(blob, type_name, stock_id): + """ + 用于做多进程录入ddb的函数 + """ + blob = gzip.decompress(blob) + dataArray = eval(f"ProtoBuffEntitys.{type_name}Message_pb2.{type_name}Array()") + dataArray.ParseFromString(blob) + + data_dict_list = [ + {field.name : val for field, val in entry.ListFields()} + for entry in dataArray.dataArray + ] + + array_type_list = [ + field.name + for field, val in dataArray.dataArray[0].ListFields() + if isinstance(field.default_value, list) + ] + #pprint(array_type_list) + + df = pd.DataFrame(data_dict_list) + #df['code'] = make_symbol(df['code']) + df['code'] = stock_id + df['m_nDate'] = self.make_date(df['m_nDate']) + df['m_nTime'] = df['m_nDate'] + self.make_time(df['m_nTime']) + for field_name in array_type_list: + df[field_name] = self.make_nparray(df[field_name]) + + #print(f"Did create ddb table for dataframe of shape {df.shape}") + # self.make_table_skeleton(type_name, df.shape[0]) + return df + + + @staticmethod + def dump_stock_daily_to_ddb(row, type_name, stock_id): + """ + 用于做多进程录入ddb的函数 + """ + df_table_name = type_name + df = make_stock_daily_df(row[2], type_name, stock_id) + + ddb_sess = ddb.session(DDBLoader.ddb_config['host'], 8848) + ddb_sess.login(DDBLoader.ddb_config['username'], DDBLoader.ddb_config['password']) + + ddb_sess.upload({df_table_name : df}) + # 因为在做Tick数据的时候,偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试 + ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( + #ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format( + dbPath = DDBLoader.ddb_path, + partitioned_table_name = type_name + DDBLoader.ddb_partition_table_suffix, + df_table_name = df_table_name + )) + + diff --git a/src/loader/DDBLoader.py b/src/loader/DDBLoader.py new file mode 100644 index 0000000..636fe5a --- /dev/null +++ b/src/loader/DDBLoader.py @@ -0,0 +1,100 @@ +import importlib +import pickle +import functools +import abc + +from pprint import pprint +from pathlib import Path + +import numpy as np +import pandas as pd + +import dolphindb as ddb +import dolphindb.settings as keys + +import sqlalchemy as sa + +from DDBBase import DDBBase + + +class DDBLoader(DDBBase): + """ + - 放了几个公用的配置字段,包括: + 1. SQL-Server的链接参数 + 2. DolphinDB的链接参数 + + - 放了几个@abstractmethod在里面,不过如果不需要使用多态特性,那应该用处不大: + 1. create_ddb_database + 2. create_ddb_partition_table + """ + + mssql_config = { + 'host' : '192.168.1.7', + 'username' : 'sa', + 'password' : 'passw0rd!' + } + + def __init__(self): + super().__init__() + self.mssql_engine = sa.create_engine( + "mssql+pyodbc://{username}:{password}@{host}/master?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config), + connect_args = { + "TrustServerCertificate": "yes" + }, echo=False + ) + + + @abc.abstractmethod + def create_ddb_database(self, *args, **kwargs): + """ + 创建database函数,需要被子类具体实现。 + """ + return + + + @abc.abstractmethod + def create_ddb_partition_table(self, *args, **kwargs): + """ + 创建分区表函数,需要被子类具体实现。 + """ + return + + + @staticmethod + def tscode_to_windcode(series): + return series.apply(lambda x : x[2:] + '.' + x[:2]) + + + @staticmethod + def make_symbol(series): + return series.astype('int32').astype('str')\ + .apply(str.zfill, args=(6,))\ + .apply(lambda code : \ + code + '.SH' if code[0] == '6' \ + else code + '.SZ') + + + @staticmethod + def make_date(series): + # 特别是对于分红表,如果某些关键日期还未公布,则会填充0,导致日期解析失败 + series.loc[series == 0] = np.nan + return pd.to_datetime( + series.astype(str), format='%Y%m%d') + + + @staticmethod + def make_nparray(series): + return series.apply(lambda x : np.array(x)) + + + @staticmethod + def make_time(series): + s_hr = series // 10000000 * 3600000 + s_min = series % 10000000 // 100000 * 60000 + s_sec = series % 100000 // 1000 + s_ms = series % 1000 + return pd.to_timedelta(s_hr + s_min + s_sec + s_ms, unit='ms') + + + + diff --git a/src/loader/DDBPITLoader.py b/src/loader/DDBPITLoader.py new file mode 100644 index 0000000..95575a6 --- /dev/null +++ b/src/loader/DDBPITLoader.py @@ -0,0 +1,287 @@ +import warnings + +from pprint import pprint +from pathlib import Path +from tqdm import tqdm + +import numpy as np +import pandas as pd +from pandas.core.common import SettingWithCopyWarning +warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) + +import dolphindb as ddb +import dolphindb.settings as keys + +import sqlalchemy as sa + +from DDBLoader import DDBLoader + + +class DDBPITLoader(DDBLoader): + + ddb_path = "dfs://pit_stock_ts" + ddb_dbname = "ddb_pit_stock_ts" + + num_code_partition = 50 + + table_name_mapping = { + #'CBS_AFTER_ADJ' : 'bs_common_adj', + #'CBS_BEFORE_ADJ' : 'bs_common_ori', + #'CCFS_AFTER_ADJ' : 'cfs_common_adj', + #'CCFS_BEFORE_ADJ' : 'cfs_common_ori', + #'CIS_AFTER_ADJ' : 'is_common_adj', + #'CIS_BEFORE_ADJ' : 'is_common_ori', + 'DIV_WIND' : 'divident', + #'EP_WIND' : 'earnings_preannouncement', + #'PEE_WIND' : 'preliminary_earnings_estimate' + } + + meta_col_config = { + 'WIND_CODE' : ('code', 'SYMBOL'), + # mssql表中不需要记录的meta字段,在这里直接设置为None + 'IntCode' : None, + 'ACTUAL_ANN_DT' : None, + 'ReportPeriod' : ('report_period', 'DATE'), + 'AppearInPeriod' : ('appear_in_period', 'DATE'), + 'AppearAtDate' : ('appear_at_date', 'DATE') + } + + date_col_set = { + 'report_period', + 'appear_in_period', + 'appear_at_date', + 'ReportPeriod', + 'AppearInPeriod', + 'AppearAtDate', + 'EQY_RECORD_DT', + 'EX_DT', + 'DVD_PAYOUT_DT', + 'S_DIV_PRELANDATE', + 'S_DIV_SMTGDATE', + 'DVD_ANN_DT', + 'S_DIV_PREANNDT' + } + + ddb_type_mapping = { + 'float' : 'DOUBLE', + 'int' : 'INT', + 'text' : 'STRING', + 'varchar' : 'STRING', + 'str' : 'STRING' + } + + # 基本面数据库现在存放在91服务器之上 + mssql_config = { + 'host' : '192.168.1.91', + 'username' : 'sa', + 'password' : 'xn.123', + 'dbname' : 'tr_statement' + } + + + def __init__(self): + super().__init__() + # 重新设定mssql_engine对象,此时我们需要使用基本面数据库 + self.mssql_engine = sa.create_engine( + "mssql+pyodbc://{username}:{password}@{host}/{dbname}?driver=ODBC+Driver+18+for+SQL+Server".format(**self.mssql_config), + connect_args = { + "TrustServerCertificate": "yes" + }, echo=False + ) + + + def create_ddb_database(self): + self.ddb_sess.run(""" + {dbName} = database( + directory = '{dbPath}', + partitionType = HASH, + partitionScheme = [SYMBOL, {num_code_partition}], + engine = 'TSDB' + ) + """.format( + dbName = self.ddb_dbname, + dbPath = self.ddb_path, + num_code_partition = self.num_code_partition + )) + + + def _make_col_config(self, mssql_table_name): + """ + Return: + mssql_col_name_list, ddb_col_name_list, ddb_col_type_list + """ + with self.mssql_engine.connect() as conn: + col_sp_list = list(conn.execute(f"exec sp_columns {mssql_table_name}").fetchall()) + + mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \ + [], [], [] + + for col_sp in col_sp_list: + _col_name = col_sp[3] + _col_type = col_sp[5] + + # 对于meta字段,需要根据meta配置表来进行处理 + if _col_name in self.meta_col_config: + # 跳过mssql表中 不需要记录的meta字段 + if self.meta_col_config[_col_name] is None: + continue + # 字段名和字段类型都要进行映射 + mssql_col_name_list.append(_col_name) + ddb_col_name_list.append(self.meta_col_config[_col_name][0]) + ddb_col_type_list.append(self.meta_col_config[_col_name][1]) + # 对于非meta字段,仅需要检查是否是float类型,对于float类型设置类型为DOUBLE + else: + # 需要之后被转换成DATE的字段,一般在原表中为为INT类型 + if _col_name in self.date_col_set: + mssql_col_name_list.append(_col_name) + ddb_col_name_list.append(_col_name) + ddb_col_type_list.append('DATE') + # 按照对照表进行类型转换 + elif _col_type in self.ddb_type_mapping: + mssql_col_name_list.append(_col_name) + ddb_col_name_list.append(_col_name) + ddb_col_type_list.append(self.ddb_type_mapping[_col_type]) + # 对照表中没有的字段类型,就不加入到字段列表中了 + else: + print(f"!**Unrecognized type '{_col_type}' for column {_col_name}, will skip.") + + return mssql_col_name_list, ddb_col_name_list, ddb_col_type_list + + + def create_ddb_partition_table(self, mssql_table_name): + """创建分区表""" + memory_table_name = mssql_table_name + partition_table_name = self.table_name_mapping[mssql_table_name] + + mssql_col_name_list, ddb_col_name_list, ddb_col_type_list = \ + self._make_col_config(mssql_table_name) + + # 根据是否 + if 'appear_in_period' in ddb_col_name_list: + compress_methods = """{ + 'report_period' : 'delta', + 'appear_in_period' : 'delta', + 'appear_at_date' : 'delta' + }""" + else: + compress_methods = """{ + 'report_period' : 'delta', + 'appear_at_date' : 'delta' + }""" + + # 因为已经根据`appear_in_period`分列了调整前和调整后,因此不需要对它再进行排序了 + sort_columns = "`code`report_period`appear_at_date" + + # 1. 先创建内存表,内存表中设定好列名和列类型 + # 2. 然后根据内存表创建分区表,设定分区列等信息 + self.ddb_sess.run(""" + {memory_table_name} = table( + {capacity}:0, + {column_name_list}, + [{column_type_list}] + ); + + if (existsTable("{ddb_path}", "{partition_table_name}")) {{ + dropTable({ddb_dbname}, "{partition_table_name}"); + }} + + {partition_table_name} = createPartitionedTable( + dbHandle = {ddb_dbname}, + table = {memory_table_name}, + tableName = "{partition_table_name}", + partitionColumns = 'code', + compressMethods = {compress_methods}, + sortColumns = {sort_columns} + ); + """.format( + ddb_dbname = self.ddb_dbname, + ddb_path = self.ddb_path, + memory_table_name = memory_table_name, + partition_table_name = partition_table_name, + capacity = 10, + column_name_list = '`' + '`'.join(ddb_col_name_list), + column_type_list = ','.join(ddb_col_type_list), + compress_methods = compress_methods.replace('\n', '').replace(' ', ''), + sort_columns = sort_columns + )) + print('-' * 80) + print(f"Did create parition table <{partition_table_name}>:") + pprint(self.ddb_sess.run(f"schema({partition_table_name});")) + return partition_table_name, mssql_col_name_list + + + def create_ddb_partition_tables(self): + for mssql_table_name in self.table_name_mapping: + self.create_ddb_partition_table(mssql_table_name) + + + def _dump_pit_to_ddb(self, mssql_table_name): + print('Will work on table', mssql_table_name) + # 返回的`mssql_col_name_list`可以用来对SQL-Server获取的dataframe进行列过滤 + partition_table_name, mssql_col_name_list = \ + self.create_ddb_partition_table(mssql_table_name) + + with self.mssql_engine.connect() as conn: + stat = f"select distinct [WIND_CODE] from {mssql_table_name}" + stock_id_list = list(conn.execute(stat).fetchall()) + + with tqdm(stock_id_list) as pbar: + for (stock_id,) in pbar: + pbar.set_description(f"Will work on {stock_id}") + #pbar.set_description(f"Will fetch all data of {stock_id} from SQL Server") + stat = """ + select * from {mssql_table_name} + where WIND_CODE='{stock_id}' and AppearAtDate>0 + """.format( + mssql_table_name = mssql_table_name, + stock_id = stock_id + ) + row_list = list(conn.execute(stat).fetchall()) + num_rows = len(row_list) + + # 因为对AppearAtDate做了过滤,所以有可能得到一个空的数据集 + if num_rows == 0: + print(f"Will skip {stock_id} due to empty result.") + continue + + #pbar.set_description(f"Will work on dumping job on {stock_id} of len {num_rows}") + # 这里需要对select语句获取的所有列进行一次过滤,以保证和partition table中的列一致 + df = pd.DataFrame(row_list)[mssql_col_name_list] + # 需要把部分字段的int字段类型转换成DATE字段类型 + for df_col in df.columns: + if df_col in self.date_col_set: + df[df_col] = DDBLoader.make_date(df[df_col]) + # 因为在做数据库View的时候已经做过一轮转换了,所以这里就不需要再次转换了 + #df['WIND_CODE'] = DDBLoader.tscode_to_windcode(df['WIND_CODE']) + + self.ddb_sess.upload({mssql_table_name : df}) + self.ddb_sess.run(f"{partition_table_name}.tableInsert({mssql_table_name})") + + + def dump_pit_to_ddb(self): + for mssql_table_name in self.table_name_mapping: + self._dump_pit_to_ddb(mssql_table_name) + + +def main(): + + # TODO: + # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 + + # PIT基本面数据 + loader = DDBPITLoader() + loader.create_ddb_database() + #loader.create_ddb_partition_tables() + loader.dump_pit_to_ddb() + + # 日频行情数据 + #loader = DDBDailyLoader() + #loader.load_ddb_database() + #loader.dump_daily_kline_to_ddb() + + +if __name__ == '__main__': + main() + + + diff --git a/src/ProtoBuffEntitys/HFDataTableMessage_pb2.py b/src/loader/ProtoBuffEntitys/HFDataTableMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/HFDataTableMessage_pb2.py rename to src/loader/ProtoBuffEntitys/HFDataTableMessage_pb2.py diff --git a/src/ProtoBuffEntitys/IndexFutureKLineMessage_pb2.py b/src/loader/ProtoBuffEntitys/IndexFutureKLineMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/IndexFutureKLineMessage_pb2.py rename to src/loader/ProtoBuffEntitys/IndexFutureKLineMessage_pb2.py diff --git a/src/ProtoBuffEntitys/IndexFutureL1TickMessage_pb2.py b/src/loader/ProtoBuffEntitys/IndexFutureL1TickMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/IndexFutureL1TickMessage_pb2.py rename to src/loader/ProtoBuffEntitys/IndexFutureL1TickMessage_pb2.py diff --git a/src/ProtoBuffEntitys/IndexKLineMessage_pb2.py b/src/loader/ProtoBuffEntitys/IndexKLineMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/IndexKLineMessage_pb2.py rename to src/loader/ProtoBuffEntitys/IndexKLineMessage_pb2.py diff --git a/src/ProtoBuffEntitys/IndexTickMessage_pb2.py b/src/loader/ProtoBuffEntitys/IndexTickMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/IndexTickMessage_pb2.py rename to src/loader/ProtoBuffEntitys/IndexTickMessage_pb2.py diff --git a/src/ProtoBuffEntitys/KLineMessage_pb2.py b/src/loader/ProtoBuffEntitys/KLineMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/KLineMessage_pb2.py rename to src/loader/ProtoBuffEntitys/KLineMessage_pb2.py diff --git a/src/ProtoBuffEntitys/OrderMessage_pb2.py b/src/loader/ProtoBuffEntitys/OrderMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/OrderMessage_pb2.py rename to src/loader/ProtoBuffEntitys/OrderMessage_pb2.py diff --git a/src/ProtoBuffEntitys/TickMessage_pb2.py b/src/loader/ProtoBuffEntitys/TickMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/TickMessage_pb2.py rename to src/loader/ProtoBuffEntitys/TickMessage_pb2.py diff --git a/src/ProtoBuffEntitys/TickQueueMessage_pb2.py b/src/loader/ProtoBuffEntitys/TickQueueMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/TickQueueMessage_pb2.py rename to src/loader/ProtoBuffEntitys/TickQueueMessage_pb2.py diff --git a/src/ProtoBuffEntitys/TranseMessage_pb2.py b/src/loader/ProtoBuffEntitys/TranseMessage_pb2.py similarity index 100% rename from src/ProtoBuffEntitys/TranseMessage_pb2.py rename to src/loader/ProtoBuffEntitys/TranseMessage_pb2.py diff --git a/src/loader/ProtoBuffEntitys/__init__.py b/src/loader/ProtoBuffEntitys/__init__.py new file mode 100644 index 0000000..859dd1d --- /dev/null +++ b/src/loader/ProtoBuffEntitys/__init__.py @@ -0,0 +1,4 @@ +# import KLineMessage_pb2, OrderMessage_pb2, TickMessage_pb2 + +# __all__ = ['KLineMessage_pb2', 'OrderMessage_pb2', 'TickMessage_pb2'] + diff --git a/src/make_hft.py b/src/loader/make_hft.py similarity index 100% rename from src/make_hft.py rename to src/loader/make_hft.py diff --git a/src/run.py b/src/run.py new file mode 100644 index 0000000..602ee50 --- /dev/null +++ b/src/run.py @@ -0,0 +1,31 @@ + +from loader.DDBHFTLoader import DDBHFTLoader +from loader.DDBBasicInfoLoader import DDBBasicInfoLoader + +def main(): + + # TODO: + # 可以使用`Fire`库,对函数调用再做一次封装,就可以避免每次运行不同参数时候需要修改内部多处的代码。 + + #loader = DDBBasicInfoLoader() + #loader.create_ddb_database() + #loader.create_ddb_partition_table() + + # 高频数据 + loader = DDBHFTLoader() + df_calendar = loader.make_calendar_df() + + #loader.init_ddb_database(df_calendar) + #print('Did finish init_ddb_database') + + loader.load_ddb_database() + print('Did load ddb database') + + loader.init_ddb_table_data(df_calendar) + print('Did finish init_table_data') + + + +if __name__ == '__main__': + main() +