things work rn

main
yzlocal 2 years ago
parent 3b0b17c04e
commit 82d3c28895

File diff suppressed because it is too large Load Diff

@ -2,7 +2,7 @@ import dolphindb as ddb
import dolphindb.settings as keys import dolphindb.settings as keys
import numpy as np import numpy as np
import pandas as pd import pandas as pd
# from loguru import logger from loguru import logger
class DDBfm(): class DDBfm():
@ -28,17 +28,17 @@ class DDBfm():
ddb_daily_path = "dfs://daily_futuremarket_ts" ddb_daily_path = "dfs://daily_futuremarket_ts"
ddb_daily_dbname = "db_daily_fm" ddb_daily_dbname = "db_daily_fm"
ddb_hft_path="dfs://hft_futuremarket_ts" ddb_hft_dbPath="dfs://hft_futuremarket_ts"
ddb_hft_dbName="hft_futuremarket_ts"
ddb_hft_tick_dbname="Tick"
ddf_hft_tick_tbname="TickPartitioned" ddf_hft_tick_tbname="TickPartitioned"
ddb_hft_mink_dbname="MinKline"
ddf_hft_mink_tbname="MinKlinePartitioned" ddf_hft_mink_tbname="MinKlinePartitioned"
def __init__(self, which_server='local', **kwargs): def __init__(self, which_server='local', **kwargs):
self.hft_db=None
self.ddb_config = self.ddb_config_servers[which_server] self.ddb_config = self.ddb_config_servers[which_server]
self.sess = ddb.session(self.ddb_config['host'], 8848) self.sess = ddb.session(self.ddb_config['host'], 8848)
@ -48,26 +48,35 @@ class DDBfm():
if self.sess.existsDatabase(dbPath): if self.sess.existsDatabase(dbPath):
self.sess.dropDatabase(dbPath) self.sess.dropDatabase(dbPath)
def create_ddb_database(self,dbPath,dbName): def create_hft_database(self):
dbPath=self.ddb_hft_dbPath
dbName=self.ddb_hft_dbName
if self.sess.existsDatabase(dbPath):
self.sess.dropDatabase(dbPath)
db_init = self.sess.database(dbName='db_init', partitionType=keys.VALUE, partitions=self.all_fm_init, dbPath='') db_init = self.sess.database(dbName='db_init', partitionType=keys.VALUE, partitions=self.all_fm_init, dbPath='')
months=np.array(pd.date_range(start='2000-01', end='2050-12', freq="M"), dtype="datetime64[M]") months=np.array(pd.date_range(start='2000-01', end='2050-12', freq="M"), dtype="datetime64[M]")
# print(months) logger.debug(months)
db_date = self.sess.database('db_date', partitionType=keys.VALUE, partitions=months, dbPath='') db_date = self.sess.database('db_date', partitionType=keys.VALUE, partitions=months, dbPath='')
if self.sess.existsDatabase(dbPath): if self.sess.existsDatabase(dbPath):
self.sess.dropDatabase(dbPath) self.sess.dropDatabase(dbPath)
db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='OLAP') self.hft_db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='OLAP')
# todo: engine=TSDB # todo: engine=TSDB
# db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='TSDB')
return db logger.info(f'Created dbName:{dbName} at dbPath:{dbPath}')
def add_new_hft_table(self, db, tbName, df): def create_hft_table(self, tbName, df):
t = self.sess.table(data=df,tableAliasName=tbName) t = self.sess.table(data=df, tableAliasName=tbName)
pt =db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init']) pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'])
# pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init'],sortColumns=["code","m_nDatetime"])
pt.append(t) pt.append(t)
def append_hft_table(self, dbPath, tbName, df): def append_hft_table(self, dbPath, tbName, df):
# load table to check? time&date&code # load table to check? time&date&code
appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=dbPath) appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=dbPath)

@ -1,8 +1,6 @@
import itertools
from datetime import datetime
import pandas as pd import pandas as pd
from pprint import pprint import numpy as np
from loguru import logger
import pyTSL import pyTSL
@ -17,13 +15,13 @@ class TSLfm:
self.timeout_default=100000000 self.timeout_default=100000000
def __enter__(self): def __enter__(self):
print('entering') logger.info('Logging in TSL.')
self.c = pyTSL.Client(TINYSOFT_USERNAME, TINYSOFT_PASSWORD, TINYSOFT_HOSTNAME, 443) self.c = pyTSL.Client(TINYSOFT_USERNAME, TINYSOFT_PASSWORD, TINYSOFT_HOSTNAME, 443)
self.c.login() self.c.login()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
print('logging out') logger.info('Logging out TSL.')
self.c.logout() self.c.logout()
del(self.c) del(self.c)
@ -31,7 +29,7 @@ class TSLfm:
scpt = """return getbk('{bk_name}'); scpt = """return getbk('{bk_name}');
""".format(bk_name=bk_name) """.format(bk_name=bk_name)
print(scpt) logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default) r = self.c.exec(scpt,timeout=self.timeout_default)
return r.value() return r.value()
@ -75,7 +73,7 @@ class TSLfm:
stock_list := Array({code_list_input}); stock_list := Array({code_list_input});
r := select r := select
['StockID'] as 'code', ['StockID'] as 'code',
DateTimeToStr(['date']) as 'datetime', DateTimeToStr(['date']) as 'm_nDatetime',
['price'] as 'm_nPrice', ['price'] as 'm_nPrice',
['open'] as 'm_nOpen', ['open'] as 'm_nOpen',
@ -135,7 +133,7 @@ class TSLfm:
start_date=start_date, start_date=start_date,
end_date=end_date end_date=end_date
) )
print(scpt) logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default) r = self.c.exec(scpt,timeout=self.timeout_default)
return r return r
@ -147,7 +145,7 @@ class TSLfm:
stock_list := Array({code_list_input}); stock_list := Array({code_list_input});
r := select r := select
['StockID'] as 'code', ['StockID'] as 'code',
DateTimeToStr(['date']) as 'datetime', DateTimeToStr(['date']) as 'm_nDatetime',
['price'] as 'm_nPrice', ['price'] as 'm_nPrice',
['open'] as 'm_nOpen', ['open'] as 'm_nOpen',
@ -208,10 +206,35 @@ class TSLfm:
start_date=start_date, start_date=start_date,
end_date=end_date end_date=end_date
) )
print(scpt) logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default) r = self.c.exec(scpt,timeout=self.timeout_default)
return r return r
def process_result_data_type(self, r): def process_result_data_type(self, r):
df = pd.DataFrame(r.value()) df = pd.DataFrame(r.value())
print(df.shape) logger.info(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}")
# new = df["m_nDatetime"].str.split(" ", n = 1, expand = True)
df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64[s]')
# df['m_nDate']=new[0].astype('datetime64[D]')
df['code_init']=df['code'].apply(lambda x:x[:-4])
for col in ['m_iVolume','m_iAccVolume','m_nMatchItems','m_nAccMatchItems','m_nBidVolume','m_nAskVolume','m_nActBidVolume','m_nAccActBidVolume','m_nActAskVolume','m_nAccActAskVolume']:
df[col]=df[col].astype(np.int16)
for col in ['m_iABFlag']:
df[col]=df[col].astype(np.int8)
logger.info(f"Processing done, new df looks like\n{df.head(5)}")
return df
if __name__ == '__main__':
from loguru import logger
logger.add("../logs/{time:YYYYMMDD-HHmmss}_TSLfm.log", rotation="10 MB", compression="zip", level="INFO")
with TSLfm() as tsl:
t_list=['CF2211']
df = tsl.process_result_data_type(tsl.get_mkt_min_k('20221031','20221101',t_list))
print(df)

@ -0,0 +1,40 @@
import sys
# import os
from loguru import logger
from os.path import dirname, abspath, join
ROOT_DIR = abspath(join(dirname(abspath(__file__)), ".."))
from loguru import logger
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}.log", rotation="10 MB", compression="zip", level="INFO")
from DDBfm import DDBfm
from TSLfm import TSLfm
def main():
df=None
with TSLfm() as tsl:
# code_list = tsl.get_code_list("国债期货")
# code_list += tsl.get_code_list("股指期货")
# code_list += tsl.get_code_list("上市期货")
# code_list=sorted(list(set(code_list)))
begin_date='20221031'
end_date='20221101'
code_list=['CF2211']
df = tsl.process_result_data_type(tsl.get_mkt_min_k(begin_date,end_date,code_list))
print(df)
ddb = DDBfm('dev')
ddb.create_hft_database()
ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df)
if __name__ == '__main__':
main()

@ -0,0 +1,29 @@
getAllDBs();
getDFSDatabases();
getScheduledJobs();
loadTableBySQL(tableName='pt',dbPath="dfs://tsdb",sql="select * limit 10;");
loadTable("dfs://tsdb","pt")
loadTable("dfs://hft_stock_ts","KLinePartitioned")
listTables("dfs:/hft_futuremarket_ts");
pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned")
select top 40 * from pt
select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned")
schema(pt)
n=1000000
ID=rand(100, n)
dates=2017.08.07..2017.08.11
date=rand(dates, n)
x=rand(10.0, n)
t=table(ID, date, x)
dbDate = database(, VALUE, 2017.08.07..2017.08.11)
dbID=database(, RANGE, 0 50 100)
db = database("dfs://compoDB", COMPO, [dbDate, dbID])
pt = db.createPartitionedTable(t, `pt, `date`ID)
pt.append!(t)
pt=loadTable(db,`pt)
select count(x) from pt
Loading…
Cancel
Save