somehow is running eeeee

main
yzlocal 2 years ago
parent 4d2ad2b411
commit aed5e94395

File diff suppressed because one or more lines are too long

@ -3,7 +3,7 @@ import dolphindb.settings as keys
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from loguru import logger from loguru import logger
import time
class DDBfm(): class DDBfm():
ddb_config_servers = { ddb_config_servers = {
@ -36,15 +36,42 @@ class DDBfm():
ddf_hft_mink_tbname="MinKlinePartitioned" ddf_hft_mink_tbname="MinKlinePartitioned"
def __init__(self, which_server='local', **kwargs): def __init__(self, which_server='local', pool=False):
self.pool=None
self.sess =None
self.hft_db=None self.hft_db=None
self.which_server=which_server self.which_server=which_server
self.ddb_config = self.ddb_config_servers[self.which_server] self.ddb_config = self.ddb_config_servers[self.which_server]
if not pool:
self.sess = ddb.session(self.ddb_config['host'], 8848) self.sess = ddb.session(self.ddb_config['host'], 8848)
self.sess.login(self.ddb_config['username'], self.ddb_config['password']) self.sess.login(self.ddb_config['username'], self.ddb_config['password'])
else:
self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password'])
def clear_pool(self):
if self.pool:
while not self.pool.isFinished:
logger.info("pool not finished,sleep for 5 seconds.")
time.sleep(5)
print("pool finished")
logger.debug("pool finished")
self.pool.shutDown()
def close_sess(self):
if self.pool:
while not self.pool.isFinished:
logger.info("pool not finished,sleep for 5 seconds.")
time.sleep(5)
print("pool finished")
logger.debug("pool finished")
self.pool.shutDown()
if self.sess:
self.sess.clearAllCache()
self.sess.close()
def drop_dbpath(self,dbPath): def drop_dbpath(self,dbPath):
if self.sess.existsDatabase(dbPath): if self.sess.existsDatabase(dbPath):
self.sess.dropDatabase(dbPath) self.sess.dropDatabase(dbPath)
@ -59,7 +86,7 @@ class DDBfm():
db_init = self.sess.database(dbName='db_init', partitionType=keys.VALUE, partitions=self.all_fm_init, dbPath='') db_init = self.sess.database(dbName='db_init', partitionType=keys.VALUE, partitions=self.all_fm_init, dbPath='')
months=np.array(pd.date_range(start='2000-01', end='2050-12', freq="M"), dtype="datetime64[M]") months=np.array(pd.date_range(start='2000-01', end='2050-12', freq="M"), dtype="datetime64[M]")
logger.debug(months) logger.debug(f'created months len: {len(months)}')
db_date = self.sess.database('db_date', partitionType=keys.VALUE, partitions=months, dbPath='') db_date = self.sess.database('db_date', partitionType=keys.VALUE, partitions=months, dbPath='')
@ -76,7 +103,7 @@ class DDBfm():
def create_hft_table(self, tbName, df): def create_hft_table(self, tbName, df):
t = self.sess.table(data=df, tableAliasName=tbName) t = self.sess.table(data=df, tableAliasName=tbName)
if self.which_server == 'prd': if self.which_server == 'prd':
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'],sortColumns=["code","m_nDatetime"]) pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'],sortColumns=["code","m_nDatetime"],compressMethods={"m_nDatetime":"delta"})
else: else:
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init']) pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'])
pt.append(t) pt.append(t)
@ -85,7 +112,14 @@ class DDBfm():
def append_hft_table(self, tbName, df): def append_hft_table(self, tbName, df):
appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=self.ddb_hft_dbPath) appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=self.ddb_hft_dbPath)
appender.append(df) appender.append(df)
logger.info(f"sucessfully append some df of {df.shape}") logger.info(f"sucessfully append some df of {df.shape} to {tbName}")
def append_pool_hft_table(self, tbName, df):
appender = ddb.PartitionedTableAppender(self.ddb_hft_dbPath, tbName, 'm_nDatetime', self.pool)
appender.append(df)
logger.info(f"Appending some df of {df.shape} to {tbName}")
def search_code_date_in_tb(self,tbName,curr_date,curr_code): def search_code_date_in_tb(self,tbName,curr_date,curr_code):
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:] curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
@ -96,6 +130,7 @@ class DDBfm():
try: try:
# doing this cuz there's no method to check if a table is empty lol # doing this cuz there's no method to check if a table is empty lol
cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d"
cond=f"code_init=`{curr_code[:2]}, m_nDatetime.date()={curr_date_formatted}d"
# print(cond) # print(cond)
df = tb.select('*').where(cond).top(1).toDF() df = tb.select('*').where(cond).top(1).toDF()
if df.empty or df.shape[0]==0: if df.empty or df.shape[0]==0:
@ -105,6 +140,35 @@ class DDBfm():
return 0 return 0
return 1 return 1
def load_tb(self,tableName):
return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName)
def get_missing_code_date_in_tb(self,tb,curr_date,code_list): # code list has to be like all CF
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
# print('?did i split this right')
# print(curr_date_formatted)
# tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName)
logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data
try:
# doing this cuz there's no method to check if a table is empty lol
# cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d"
cond=f"code_init=`{code_list[0][:2]}, m_nDatetime.date()={curr_date_formatted}d"
# print(cond)
df = tb.select('distinct code').where(cond).toDF()
if df.empty or df.shape[0]==0:
# print(df)
return code_list
except:
return code_list
ddb_code_list = df['distinct_code'].to_list()
print(ddb_code_list)
return_code_list=[]
for c in code_list:
if c not in ddb_code_list:
return_code_list.append(c)
return return_code_list
if __name__ == '__main__': if __name__ == '__main__':
pass pass

@ -15,13 +15,13 @@ class TSLfm:
self.timeout_default=100000000 self.timeout_default=100000000
def __enter__(self): def __enter__(self):
logger.info('Logging in TSL.') logger.debug('Logging in TSL.')
self.c = pyTSL.Client(TINYSOFT_USERNAME, TINYSOFT_PASSWORD, TINYSOFT_HOSTNAME, 443) self.c = pyTSL.Client(TINYSOFT_USERNAME, TINYSOFT_PASSWORD, TINYSOFT_HOSTNAME, 443)
self.c.login() self.c.login()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
logger.info('Logging out TSL.') logger.debug('Logging out TSL.')
self.c.logout() self.c.logout()
del(self.c) del(self.c)
@ -215,7 +215,7 @@ class TSLfm:
if df.empty: if df.empty:
logger.info('No data on this day.') logger.info('No data on this day.')
return pd.DataFrame() return pd.DataFrame()
logger.info(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}") logger.debug(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}")
# new = df["m_nDatetime"].str.split(" ", n = 1, expand = True) # new = df["m_nDatetime"].str.split(" ", n = 1, expand = True)
df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64') df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64')
@ -229,7 +229,8 @@ class TSLfm:
for col in ['m_iABFlag']: for col in ['m_iABFlag']:
df[col]=df[col].astype(np.int8) df[col]=df[col].astype(np.int8)
logger.info(f"Processing done, new df looks like\n{df.head(5)}") logger.info(f"Processing result df of shape {df.shape} done")
logger.debug(f"New df looks like\n{df.head(5)}")
return df return df
@ -238,6 +239,6 @@ if __name__ == '__main__':
logger.add("../logs/{time:YYYYMMDD-HHmmss}_TSLfm.log", rotation="10 MB", compression="zip", level="INFO") logger.add("../logs/{time:YYYYMMDD-HHmmss}_TSLfm.log", rotation="10 MB", compression="zip", level="INFO")
with TSLfm() as tsl: with TSLfm() as tsl:
t_list=['T2212'] t_list=['CF2211']
df = tsl.process_result_data_type(tsl.get_mkt_min_k('20221031','20221101',t_list)) df = tsl.process_result_data_type(tsl.get_mkt_min_k('20221031','20221101',t_list))
print(df) print(df)

@ -8,8 +8,7 @@ ROOT_DIR = abspath(join(dirname(abspath(__file__)), ".."))
from loguru import logger from loguru import logger
logger.remove() logger.remove()
logger.add(sys.stderr, level="INFO") logger.add(sys.stderr, level="INFO")
logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log", rotation="10 MB", compression="zip", level="INFO") logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log", rotation="10 MB", compression="zip", level="DEBUG")
import pandas as pd import pandas as pd
@ -37,15 +36,84 @@ def run_add_1day_code_init_minKline(date,code_list):
logger.info(f'Getting a df of {df.shape}: {code_list[0][:-4]} on {date}') logger.info(f'Getting a df of {df.shape}: {code_list[0][:-4]} on {date}')
ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df) ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df)
def run_create_db_minKline(): # def run_pool_add_byday_code_init_minKline(date_list,code_list):
date = '20221101' # df_list=[]
# code_list_filtered=code_list
# ddb1 = DDBfm(running_which_env)
# tb=ddb1.load_tb(tableName=ddb1.ddf_hft_mink_tbname)
# # tb=ddb1.sess.loadTable(dbPath=ddb1.ddb_hft_dbPath, tableName=ddb1.ddf_hft_mink_tbname)
# for date in date_list:
# with TSLfm() as tsl:
# df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
# if df.empty:
# continue
# code_list_filtered = ddb1.get_missing_code_date_in_tb(tb,date,code_list)
# if len(code_list_filtered)==0:
# continue
# logger.info(f"getting {'+'.join(code_list_filtered)} on {date}")
# df=df[df['code'].isin(code_list_filtered)]
# df_list.append(df)
# ddb1.close_sess()
# del ddb1
# if df_list:
# df_all = pd.concat(df_list)
# ddb2 = DDBfm(running_which_env,pool=True)
# logger.info(f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}')
# ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname,df_all)
# ddb2.clear_pool()
# del ddb2
def run_pool_add_byday_code_init_tick(date_list,code_list):
df_list=[]
code_list_filtered=code_list
for date in date_list:
ddb1 = DDBfm(running_which_env)
code_list_filtered = ddb1.get_missing_code_date_in_tb(ddb1.ddf_hft_mink_tbname,date,code_list)
if len(code_list_filtered)==0:
continue
logger.info(f"getting {'+'.join(code_list_filtered)} on {date}")
ddb1.close_sess()
del ddb1
with TSLfm() as tsl: with TSLfm() as tsl:
code_list=['T2212'] df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered))
df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list)) if not df.empty:
# print(df) df_list.append(df)
df_all = pd.concat(df_list)
ddb2 = DDBfm(running_which_env,pool=True)
logger.info(f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}')
ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname,df_all)
ddb2.close_sess()
del ddb2
def run_create_hft_db(date = '20221101'):
ddb = DDBfm(running_which_env) ddb = DDBfm(running_which_env)
ddb.create_hft_database() ddb.create_hft_database()
ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df)
with TSLfm() as tsl:
code_list=['T2212']
df_mink = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
# print(df)
ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df_mink)
with TSLfm() as tsl:
code_list=['T2212']
df_tick = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list))
# print(df)
ddb.create_hft_table(ddb.ddf_hft_tick_tbname,df_tick)
def run(): def run():
@ -60,38 +128,79 @@ def run():
# print(all_code_dict_by_init) # print(all_code_dict_by_init)
start_date='2022-09-30' start_date='2022-09-30'
end_date='2022-11-09' end_date='2022-10-31'
allDates = pd.date_range(start_date, end_date, freq ='D') allDates = pd.date_range(start_date, end_date, freq ='D')
allDates = [i.replace('-','') for i in list(allDates.astype('str'))] allDates = [i.replace('-','') for i in list(allDates.astype('str'))]
for date in allDates: for date in allDates:
for code_init in all_code_dict_by_init: for ind,code_init in enumerate(all_code_dict_by_init):
logger.info(f"Getting {code_init} (no.{ind})")
code_list = all_code_dict_by_init[code_init] code_list = all_code_dict_by_init[code_init]
run_add_1day_code_init_minKline(date,code_list) run_add_1day_code_init_minKline(date,code_list)
# date = '20221101'
# with TSLfm() as tsl:
# # code_list = tsl.get_code_list("国债期货")
# # code_list += tsl.get_code_list("股指期货")
# # code_list += tsl.get_code_list("上市期货")
# # code_list=sorted(list(set(code_list)))
# # print(code_list_pickel)
# code_list=['CF2211']
# df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
# print(df)
# ddb = DDBfm('prd') def run_pool_dates_by_code_init_n_group(typ='mink',group_amount=10,start_date='20220101',end_date='20221031'):
# ddb.create_hft_database() logger.info("Running run_pool_dates_by_group")
# ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df) all_code_dict_by_init={}
for c in code_list_pickel:
init = c[:-4]
if init in all_code_dict_by_init:
all_code_dict_by_init[init].append(c)
else:
all_code_dict_by_init[init]=[c]
# print(all_code_dict_by_init)
allDates = pd.date_range(start_date, end_date, freq ='D')
dates_dict_by_day={}
for d in list(allDates.astype('str')):
group_no = int(d[-2:])%group_amount
if group_no not in dates_dict_by_day:
dates_dict_by_day[group_no] = [d.replace('-','')]
else:
dates_dict_by_day[group_no].append(d.replace('-',''))
logger.debug(dates_dict_by_day)
for group_no in dates_dict_by_day:
date_list=dates_dict_by_day[group_no]
num_of_init = len(all_code_dict_by_init)
for ind,code_init in enumerate(all_code_dict_by_init):
# done: 'T','TS','TS','TF'
if code_init in ['T']: # todo filtered this ,,'TF', 'IC','IF','IH','IM'
logger.info(f"Getting {code_init} (no.{ind}/{num_of_init} of group {group_no}/{group_amount})")
code_list = all_code_dict_by_init[code_init]
if typ=='mink':
# logger.info('Running mink')
logger.error('mink by day to be fixed')
# run_pool_add_byday_code_init_minKline(date_list,code_list)
elif typ=='tick':
logger.info('Running tick')
run_pool_add_byday_code_init_tick(date_list,code_list)
# if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname,date,'CF2211'):
# logger.warning(f"Possible duplicates on {date} and ")
# ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df)
if __name__ == '__main__': if __name__ == '__main__':
run() # run()
# run_create_db_minKline()
# run_create_hft_db() # including two tables
import time
tic = time.perf_counter()
run_pool_dates_by_code_init_n_group(typ='tick')
# run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5)
toc = time.perf_counter()
logger.info(f"Running used {toc - tic:0.4f} seconds")
# all t taks Running used 588.5782 seconds for 10 months
# 600/60=10 min 12min for take code_init
# 12* 71 = 850 min / 60 = 15 hr for all code for each year

@ -7,9 +7,16 @@ loadTable("dfs://hft_stock_ts","KLinePartitioned")
listTables("dfs:/hft_futuremarket_ts"); listTables("dfs:/hft_futuremarket_ts");
pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned") pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned")
select top 40 * from pt pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned")
select top 40 * from pt where code=`T2212 and m_nDatetime.date() >=2022.01.01d
select top 4 * from pt where code_init=`TS and m_nDatetime.date()>=2022.01.01d
select distinct m_nDatetime.date() from pt where code=`AP2211
select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")
select top 40 * from pt
schema(pt) schema(pt)
n=1000000 n=1000000

Loading…
Cancel
Save