From 4e35f2de8bc2b5bb9b2d5d185dfb974eae10a019 Mon Sep 17 00:00:00 2001 From: yzlocal Date: Fri, 11 Nov 2022 13:47:30 +0800 Subject: [PATCH] more updats --- src/DDBfm.py | 22 ++++++++++++---------- src/TSLfm.py | 4 ++-- src/data_loader.py | 47 +++++++++++++++++++++++++++++----------------- test/test.dos | 5 +++-- 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/DDBfm.py b/src/DDBfm.py index ab9e601..d06eb4c 100644 --- a/src/DDBfm.py +++ b/src/DDBfm.py @@ -60,13 +60,13 @@ class DDBfm(): self.pool.shutDown() def close_sess(self): - if self.pool: - while not self.pool.isFinished: - logger.info("pool not finished,sleep for 5 seconds.") - time.sleep(5) - print("pool finished") - logger.debug("pool finished") - self.pool.shutDown() + # if self.pool: + # while not self.pool.isFinished: + # logger.info("pool not finished,sleep for 5 seconds.") + # time.sleep(5) + # print("pool finished") + # logger.debug("pool finished") + # self.pool.shutDown() if self.sess: self.sess.clearAllCache() @@ -143,16 +143,16 @@ class DDBfm(): def load_tb(self,tableName): return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName) - def get_missing_code_date_in_tb(self,tb,curr_date,code_list): # code list has to be like all CF + def get_missing_code_date_in_tb(self,tbName,curr_date,code_list): # code list has to be like all CF curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:] # print('?did i split this right') # print(curr_date_formatted) - # tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName) + tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName) logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data try: # doing this cuz there's no method to check if a table is empty lol # cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" - cond=f"code_init=`{code_list[0][:2]}, m_nDatetime.date()={curr_date_formatted}d" + cond=f"code_init=`{code_list[0][:-4]}, m_nDatetime.date()={curr_date_formatted}d" # print(cond) df = tb.select('distinct code').where(cond).toDF() @@ -167,6 +167,8 @@ class DDBfm(): for c in code_list: if c not in ddb_code_list: return_code_list.append(c) + else: + logger.warning(f'{c} on {curr_date} is recorded') return return_code_list diff --git a/src/TSLfm.py b/src/TSLfm.py index a02869d..e5c48a6 100644 --- a/src/TSLfm.py +++ b/src/TSLfm.py @@ -215,7 +215,7 @@ class TSLfm: if df.empty: logger.info('No data on this day.') return pd.DataFrame() - logger.debug(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}") + # logger.debug(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}") # new = df["m_nDatetime"].str.split(" ", n = 1, expand = True) df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64') @@ -230,7 +230,7 @@ class TSLfm: df[col]=df[col].astype(np.int8) logger.info(f"Processing result df of shape {df.shape} done") - logger.debug(f"New df looks like\n{df.head(5)}") + logger.debug(f"New df looks like\n{df.head(1)}") return df diff --git a/src/data_loader.py b/src/data_loader.py index 97751eb..1742656 100644 --- a/src/data_loader.py +++ b/src/data_loader.py @@ -1,6 +1,6 @@ import sys -running_which_env='prd' +running_which_env='dev' from os.path import dirname, abspath, join @@ -69,31 +69,43 @@ def run_add_1day_code_init_minKline(date,code_list): # del ddb2 -def check_if_date_codelist_exists(date,code_list): +def check_if_date_codelist_exists(typ,date,code_list): code_list_filtered=code_list ddb1 = DDBfm(running_which_env) - code_list_filtered = ddb1.get_missing_code_date_in_tb(ddb1.ddf_hft_mink_tbname,date,code_list) + + if typ=='tick': + tbName = ddb1.ddf_hft_tick_tbname + elif typ=='mink': + tbName = ddb1.ddf_hft_mink_tbname + + code_list_filtered = ddb1.get_missing_code_date_in_tb(tbName,date,code_list) + if code_list_filtered: - logger.info(f"getting {'+'.join(code_list_filtered)} on {date}") + logger.info(f"Need to download {'+'.join(code_list_filtered)} on {date} in {tbName}") else: - logger.info(f"all checked in database") + logger.info(f"all codes checked in database {tbName} on {date}") ddb1.close_sess() del ddb1 return code_list_filtered -def run_pool_add_byday_code_init_tick(date_list,code_list,if_check=1): +def run_pool_add_by_datelist_codeinit(typ,date_list,code_list,if_check=1): df_list=[] for date in date_list: if if_check: - code_list_filtered = check_if_date_codelist_exists(date,code_list) + code_list_filtered = check_if_date_codelist_exists(typ,date,code_list) else: code_list_filtered = code_list with TSLfm() as tsl: - df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered)) + if typ == 'tick': + df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered)) + elif typ == 'mink': + df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list_filtered)) if not df.empty: df_list.append(df) + if not df_list: + return 0 df_all = pd.concat(df_list) ddb2 = DDBfm(running_which_env,pool=True) @@ -186,27 +198,28 @@ def run_pool_dates_by_code_init_n_group(typ='mink',gp_amt=10,start_date='2022010 code_list = all_code_dict_by_init[code_init] if typ=='mink': # logger.info('Running mink') - logger.error('mink by day to be fixed') + run_pool_add_by_datelist_codeinit('mink',date_list,code_list,if_check) # run_pool_add_byday_code_init_minKline(date_list,code_list) elif typ=='tick': logger.info('Running tick') - run_pool_add_byday_code_init_tick(date_list,code_list,if_check) + run_pool_add_by_datelist_codeinit('tick',date_list,code_list,if_check) if __name__ == '__main__': import time - # run() - run_create_hft_db() # including two tables + + # run_create_hft_db() # including two tables tic = time.perf_counter() - typ='tick' - st_d='20220601' - en_d='20221031' - if_check = 0 - logger.info(f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check}") + typ='mink' + st_d='20221101' + en_d='20221102' + if_check = 1 + + logger.info(f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check} in {running_which_env}, plz check if this info is correct.\n\n\n\n") run_pool_dates_by_code_init_n_group(typ=typ,gp_amt=3,start_date=st_d,end_date=en_d,if_check=if_check) # run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5) diff --git a/test/test.dos b/test/test.dos index 1fa3fda..74a6315 100644 --- a/test/test.dos +++ b/test/test.dos @@ -9,9 +9,10 @@ listTables("dfs:/hft_futuremarket_ts"); pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned") pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned") -select top 40 * from pt where code=`T2212 and m_nDatetime.date() >=2022.01.01d -select top 4 * from pt where code_init=`TS and m_nDatetime.date()>=2022.01.01d +select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d +select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d select distinct m_nDatetime.date() from pt where code=`AP2211 +select distinct code from pt select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")