more updats

main
yzlocal 2 years ago
parent f225e38c73
commit 4e35f2de8b

@ -60,13 +60,13 @@ class DDBfm():
self.pool.shutDown() self.pool.shutDown()
def close_sess(self): def close_sess(self):
if self.pool: # if self.pool:
while not self.pool.isFinished: # while not self.pool.isFinished:
logger.info("pool not finished,sleep for 5 seconds.") # logger.info("pool not finished,sleep for 5 seconds.")
time.sleep(5) # time.sleep(5)
print("pool finished") # print("pool finished")
logger.debug("pool finished") # logger.debug("pool finished")
self.pool.shutDown() # self.pool.shutDown()
if self.sess: if self.sess:
self.sess.clearAllCache() self.sess.clearAllCache()
@ -143,16 +143,16 @@ class DDBfm():
def load_tb(self,tableName): def load_tb(self,tableName):
return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName) return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName)
def get_missing_code_date_in_tb(self,tb,curr_date,code_list): # code list has to be like all CF def get_missing_code_date_in_tb(self,tbName,curr_date,code_list): # code list has to be like all CF
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:] curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
# print('?did i split this right') # print('?did i split this right')
# print(curr_date_formatted) # print(curr_date_formatted)
# tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName) tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName)
logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data
try: try:
# doing this cuz there's no method to check if a table is empty lol # doing this cuz there's no method to check if a table is empty lol
# cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" # cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d"
cond=f"code_init=`{code_list[0][:2]}, m_nDatetime.date()={curr_date_formatted}d" cond=f"code_init=`{code_list[0][:-4]}, m_nDatetime.date()={curr_date_formatted}d"
# print(cond) # print(cond)
df = tb.select('distinct code').where(cond).toDF() df = tb.select('distinct code').where(cond).toDF()
@ -167,6 +167,8 @@ class DDBfm():
for c in code_list: for c in code_list:
if c not in ddb_code_list: if c not in ddb_code_list:
return_code_list.append(c) return_code_list.append(c)
else:
logger.warning(f'{c} on {curr_date} is recorded')
return return_code_list return return_code_list

@ -215,7 +215,7 @@ class TSLfm:
if df.empty: if df.empty:
logger.info('No data on this day.') logger.info('No data on this day.')
return pd.DataFrame() return pd.DataFrame()
logger.debug(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}") # logger.debug(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}")
# new = df["m_nDatetime"].str.split(" ", n = 1, expand = True) # new = df["m_nDatetime"].str.split(" ", n = 1, expand = True)
df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64') df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64')
@ -230,7 +230,7 @@ class TSLfm:
df[col]=df[col].astype(np.int8) df[col]=df[col].astype(np.int8)
logger.info(f"Processing result df of shape {df.shape} done") logger.info(f"Processing result df of shape {df.shape} done")
logger.debug(f"New df looks like\n{df.head(5)}") logger.debug(f"New df looks like\n{df.head(1)}")
return df return df

@ -1,6 +1,6 @@
import sys import sys
running_which_env='prd' running_which_env='dev'
from os.path import dirname, abspath, join from os.path import dirname, abspath, join
@ -69,31 +69,43 @@ def run_add_1day_code_init_minKline(date,code_list):
# del ddb2 # del ddb2
def check_if_date_codelist_exists(date,code_list): def check_if_date_codelist_exists(typ,date,code_list):
code_list_filtered=code_list code_list_filtered=code_list
ddb1 = DDBfm(running_which_env) ddb1 = DDBfm(running_which_env)
code_list_filtered = ddb1.get_missing_code_date_in_tb(ddb1.ddf_hft_mink_tbname,date,code_list)
if typ=='tick':
tbName = ddb1.ddf_hft_tick_tbname
elif typ=='mink':
tbName = ddb1.ddf_hft_mink_tbname
code_list_filtered = ddb1.get_missing_code_date_in_tb(tbName,date,code_list)
if code_list_filtered: if code_list_filtered:
logger.info(f"getting {'+'.join(code_list_filtered)} on {date}") logger.info(f"Need to download {'+'.join(code_list_filtered)} on {date} in {tbName}")
else: else:
logger.info(f"all checked in database") logger.info(f"all codes checked in database {tbName} on {date}")
ddb1.close_sess() ddb1.close_sess()
del ddb1 del ddb1
return code_list_filtered return code_list_filtered
def run_pool_add_byday_code_init_tick(date_list,code_list,if_check=1): def run_pool_add_by_datelist_codeinit(typ,date_list,code_list,if_check=1):
df_list=[] df_list=[]
for date in date_list: for date in date_list:
if if_check: if if_check:
code_list_filtered = check_if_date_codelist_exists(date,code_list) code_list_filtered = check_if_date_codelist_exists(typ,date,code_list)
else: else:
code_list_filtered = code_list code_list_filtered = code_list
with TSLfm() as tsl: with TSLfm() as tsl:
df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered)) if typ == 'tick':
df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered))
elif typ == 'mink':
df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list_filtered))
if not df.empty: if not df.empty:
df_list.append(df) df_list.append(df)
if not df_list:
return 0
df_all = pd.concat(df_list) df_all = pd.concat(df_list)
ddb2 = DDBfm(running_which_env,pool=True) ddb2 = DDBfm(running_which_env,pool=True)
@ -186,27 +198,28 @@ def run_pool_dates_by_code_init_n_group(typ='mink',gp_amt=10,start_date='2022010
code_list = all_code_dict_by_init[code_init] code_list = all_code_dict_by_init[code_init]
if typ=='mink': if typ=='mink':
# logger.info('Running mink') # logger.info('Running mink')
logger.error('mink by day to be fixed') run_pool_add_by_datelist_codeinit('mink',date_list,code_list,if_check)
# run_pool_add_byday_code_init_minKline(date_list,code_list) # run_pool_add_byday_code_init_minKline(date_list,code_list)
elif typ=='tick': elif typ=='tick':
logger.info('Running tick') logger.info('Running tick')
run_pool_add_byday_code_init_tick(date_list,code_list,if_check) run_pool_add_by_datelist_codeinit('tick',date_list,code_list,if_check)
if __name__ == '__main__': if __name__ == '__main__':
import time import time
# run()
run_create_hft_db() # including two tables # run_create_hft_db() # including two tables
tic = time.perf_counter() tic = time.perf_counter()
typ='tick' typ='mink'
st_d='20220601' st_d='20221101'
en_d='20221031' en_d='20221102'
if_check = 0 if_check = 1
logger.info(f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check}")
logger.info(f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check} in {running_which_env}, plz check if this info is correct.\n\n\n\n")
run_pool_dates_by_code_init_n_group(typ=typ,gp_amt=3,start_date=st_d,end_date=en_d,if_check=if_check) run_pool_dates_by_code_init_n_group(typ=typ,gp_amt=3,start_date=st_d,end_date=en_d,if_check=if_check)
# run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5) # run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5)

@ -9,9 +9,10 @@ listTables("dfs:/hft_futuremarket_ts");
pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned") pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned")
pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned") pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned")
select top 40 * from pt where code=`T2212 and m_nDatetime.date() >=2022.01.01d select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d
select top 4 * from pt where code_init=`TS and m_nDatetime.date()>=2022.01.01d select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d
select distinct m_nDatetime.date() from pt where code=`AP2211 select distinct m_nDatetime.date() from pt where code=`AP2211
select distinct code from pt
select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")

Loading…
Cancel
Save