From 5a2439c7f63af176d5365a45a5de713ba2c65337 Mon Sep 17 00:00:00 2001 From: yzlocal Date: Fri, 11 Nov 2022 15:59:14 +0800 Subject: [PATCH] cleaned up and updated methods --- src/DDBfm.py | 51 ++++++---- src/data_loader.py | 248 ++++++++++++++++++++++----------------------- test/test.dos | 3 +- 3 files changed, 157 insertions(+), 145 deletions(-) diff --git a/src/DDBfm.py b/src/DDBfm.py index d06eb4c..c18d9ba 100644 --- a/src/DDBfm.py +++ b/src/DDBfm.py @@ -50,23 +50,23 @@ class DDBfm(): else: self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password']) - def clear_pool(self): + # def clear_pool(self): + # if self.pool: + # while not self.pool.isFinished: + # logger.info("pool not finished,sleep for 5 seconds.") + # time.sleep(5) + # print("pool finished") + # logger.debug("pool finished") + # self.pool.shutDown() + + def close_sess(self): if self.pool: while not self.pool.isFinished: logger.info("pool not finished,sleep for 5 seconds.") time.sleep(5) - print("pool finished") + # print("pool finished") logger.debug("pool finished") self.pool.shutDown() - - def close_sess(self): - # if self.pool: - # while not self.pool.isFinished: - # logger.info("pool not finished,sleep for 5 seconds.") - # time.sleep(5) - # print("pool finished") - # logger.debug("pool finished") - # self.pool.shutDown() if self.sess: self.sess.clearAllCache() @@ -143,26 +143,37 @@ class DDBfm(): def load_tb(self,tableName): return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName) - def get_missing_code_date_in_tb(self,tbName,curr_date,code_list): # code list has to be like all CF + def get_missing_code_date_in_tb(self,tbName,curr_date,code_list): # code list has to be like all CF 2022-11-11 fixed it + logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data + curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:] # print('?did i split this right') # print(curr_date_formatted) - tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName) - logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data + # tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName) try: - # doing this cuz there's no method to check if a table is empty lol - # cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" - cond=f"code_init=`{code_list[0][:-4]}, m_nDatetime.date()={curr_date_formatted}d" - # print(cond) - df = tb.select('distinct code').where(cond).toDF() + + df = self.sess.run(f"""pt=loadTable("{self.ddb_hft_dbPath}","{tbName}"); + select distinct code from pt where m_nDatetime.date()={curr_date_formatted}d and code in ({" ".join(["`"+c for c in code_list])}); + """) + # # doing this cuz there's no method to check if a table is empty lol + # # cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" + # # cond=f"code_init=`{code_list[0][:-4]}, m_nDatetime.date()={curr_date_formatted}d" + + # # code_list_str = " or ".join([f'code=`{c}' for c in code_list]) + # cond=f"""`m_nDatetime.date()={curr_date_formatted}d and code in ({" ".join(["`"+c for c in code_list])})""" + + # print(cond) + # ddb1.run("""select distinct code from pt where m_nDatetime.date()=2022.11.01d and code in (`AP2305 `AP2310)""") + # df = tb.select('distinct code').where(cond).toDF() if df.empty or df.shape[0]==0: # print(df) return code_list except: return code_list + ddb_code_list = df['distinct_code'].to_list() - print(ddb_code_list) + # print(ddb_code_list) return_code_list=[] for c in code_list: if c not in ddb_code_list: diff --git a/src/data_loader.py b/src/data_loader.py index 1742656..82aa56f 100644 --- a/src/data_loader.py +++ b/src/data_loader.py @@ -1,87 +1,60 @@ +from code_list import code_list_pickel +from TSLfm import TSLfm +from DDBfm import DDBfm +import pandas as pd +from loguru import logger +from os.path import dirname, abspath, join import sys -running_which_env='dev' +running_which_env = 'prd' -from os.path import dirname, abspath, join ROOT_DIR = abspath(join(dirname(abspath(__file__)), "..")) -from loguru import logger logger.remove() logger.add(sys.stderr, level="INFO") -logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log", rotation="10 MB", compression="zip", level="DEBUG") +logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log", + rotation="10 MB", compression="zip", level="DEBUG") -import pandas as pd - -from DDBfm import DDBfm -from TSLfm import TSLfm - -from code_list import code_list_pickel - -def run_add_1day_code_init_minKline(date,code_list): +def run_add_1day_code_init_minKline(date, code_list): + """ + too slow. depracated. + """ ddb = DDBfm(running_which_env) code_list_filtered = [] for code in code_list: - if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname,date,code): + if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname, date, code): logger.warning(f"Possible duplicates on {date} and {code}") else: code_list_filtered.append(code) - if len(code_list_filtered)==0: + if len(code_list_filtered) == 0: return 0 with TSLfm() as tsl: - df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list_filtered)) + df = tsl.process_result_data_type( + tsl.get_mkt_min_k(date, date, code_list_filtered)) if not df.empty: - logger.info(f'Getting a df of {df.shape}: {code_list[0][:-4]} on {date}') - ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df) - -# def run_pool_add_byday_code_init_minKline(date_list,code_list): -# df_list=[] -# code_list_filtered=code_list -# ddb1 = DDBfm(running_which_env) - -# tb=ddb1.load_tb(tableName=ddb1.ddf_hft_mink_tbname) -# # tb=ddb1.sess.loadTable(dbPath=ddb1.ddb_hft_dbPath, tableName=ddb1.ddf_hft_mink_tbname) -# for date in date_list: -# with TSLfm() as tsl: -# df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list)) -# if df.empty: -# continue - -# code_list_filtered = ddb1.get_missing_code_date_in_tb(tb,date,code_list) -# if len(code_list_filtered)==0: -# continue -# logger.info(f"getting {'+'.join(code_list_filtered)} on {date}") - -# df=df[df['code'].isin(code_list_filtered)] -# df_list.append(df) -# ddb1.close_sess() -# del ddb1 - -# if df_list: -# df_all = pd.concat(df_list) - -# ddb2 = DDBfm(running_which_env,pool=True) -# logger.info(f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}') -# ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname,df_all) -# ddb2.clear_pool() -# del ddb2 - - -def check_if_date_codelist_exists(typ,date,code_list): - code_list_filtered=code_list + logger.info( + f'Getting a df of {df.shape}: {code_list[0][:-4]} on {date}') + ddb.append_hft_table(ddb.ddf_hft_mink_tbname, df) + + +def check_if_date_codelist_exists(typ, date, code_list): + code_list_filtered = code_list ddb1 = DDBfm(running_which_env) - if typ=='tick': + if typ == 'tick': tbName = ddb1.ddf_hft_tick_tbname - elif typ=='mink': + elif typ == 'mink': tbName = ddb1.ddf_hft_mink_tbname - - code_list_filtered = ddb1.get_missing_code_date_in_tb(tbName,date,code_list) + + code_list_filtered = ddb1.get_missing_code_date_in_tb( + tbName, date, code_list) if code_list_filtered: - logger.info(f"Need to download {'+'.join(code_list_filtered)} on {date} in {tbName}") + logger.info( + f"Need to download {'+'.join(code_list_filtered)} on {date} in {tbName}") else: logger.info(f"all codes checked in database {tbName} on {date}") ddb1.close_sess() @@ -89,122 +62,145 @@ def check_if_date_codelist_exists(typ,date,code_list): return code_list_filtered -def run_pool_add_by_datelist_codeinit(typ,date_list,code_list,if_check=1): - df_list=[] - +def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1): + df_list = [] + for date in date_list: if if_check: - code_list_filtered = check_if_date_codelist_exists(typ,date,code_list) + code_list_filtered = check_if_date_codelist_exists( + typ, date, code_list) else: code_list_filtered = code_list with TSLfm() as tsl: if typ == 'tick': - df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered)) + df = tsl.process_result_data_type( + tsl.get_trade_tick(date, date, code_list_filtered)) elif typ == 'mink': - df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list_filtered)) + df = tsl.process_result_data_type( + tsl.get_mkt_min_k(date, date, code_list_filtered)) if not df.empty: df_list.append(df) if not df_list: return 0 df_all = pd.concat(df_list) - ddb2 = DDBfm(running_which_env,pool=True) - logger.info(f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}') - ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname,df_all) + ddb2 = DDBfm(running_which_env, pool=True) + logger.info( + f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}') + if typ == 'tick': + ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname, df_all) + elif typ == 'mink': + ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname, df_all) ddb2.close_sess() del ddb2 -def run_create_hft_db(date = '20221101'): - +def run_create_hft_db(date='20221101', if_mink=1, if_tick=1): + code_list = ['T2212'] ddb = DDBfm(running_which_env) ddb.create_hft_database() - with TSLfm() as tsl: - code_list=['T2212'] - df_mink = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list)) - # print(df) - ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df_mink) + if if_mink: + logger.info(f"creating mink on {date} for {code_list[0]}") + with TSLfm() as tsl: + df_mink = tsl.process_result_data_type( + tsl.get_mkt_min_k(date, date, code_list)) + # print(df) + ddb.create_hft_table(ddb.ddf_hft_mink_tbname, df_mink) - with TSLfm() as tsl: - code_list=['T2212'] - df_tick = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list)) - # print(df) + if if_tick: + logger.info(f"creating tick on {date} for {code_list[0]}") + with TSLfm() as tsl: + df_tick = tsl.process_result_data_type( + tsl.get_trade_tick(date, date, code_list)) + # print(df) - ddb.create_hft_table(ddb.ddf_hft_tick_tbname,df_tick) + ddb.create_hft_table(ddb.ddf_hft_tick_tbname, df_tick) - def run(): - - all_code_dict_by_init={} + """ + too slow. depracated. + """ + all_code_dict_by_init = {} for c in code_list_pickel: init = c[:-4] if init in all_code_dict_by_init: all_code_dict_by_init[init].append(c) else: - all_code_dict_by_init[init]=[c] + all_code_dict_by_init[init] = [c] # print(all_code_dict_by_init) - start_date='2022-09-30' - end_date='2022-10-31' - allDates = pd.date_range(start_date, end_date, freq ='D') - allDates = [i.replace('-','') for i in list(allDates.astype('str'))] + start_date = '2022-09-30' + end_date = '2022-10-31' + allDates = pd.date_range(start_date, end_date, freq='D') + allDates = [i.replace('-', '') for i in list(allDates.astype('str'))] for date in allDates: - for ind,code_init in enumerate(all_code_dict_by_init): + for ind, code_init in enumerate(all_code_dict_by_init): logger.info(f"Getting {code_init} (no.{ind})") code_list = all_code_dict_by_init[code_init] - run_add_1day_code_init_minKline(date,code_list) - + run_add_1day_code_init_minKline(date, code_list) - -def run_pool_dates_by_code_init_n_group(typ='mink',gp_amt=10,start_date='20220101',end_date='20221031',if_check=1): +def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt=10, start_date='20220101', end_date='20221031', if_check=1, code_dict_by='init'): logger.info("Running run_pool_dates_by_group") - all_code_dict_by_init={} + + all_code_dict_by_init = {} for c in code_list_pickel: init = c[:-4] if init in all_code_dict_by_init: all_code_dict_by_init[init].append(c) else: - all_code_dict_by_init[init]=[c] - - # print(all_code_dict_by_init) - - - allDates = pd.date_range(start_date, end_date, freq ='D') - dates_dict_by_day={} + all_code_dict_by_init[init] = [c] + + if code_dict_by == 'init': + all_code_dict = all_code_dict_by_init + if code_dict_by == 'group': + all_code_dict_by_group_no = {} + for ind, code_init in enumerate(sorted(all_code_dict_by_init)): + group_no = ind % code_gp_amt + if group_no not in all_code_dict_by_group_no: + all_code_dict_by_group_no[group_no] = all_code_dict_by_init[code_init] + else: + all_code_dict_by_group_no[group_no] += all_code_dict_by_init[code_init] + all_code_dict = all_code_dict_by_group_no + + allDates = pd.date_range(start_date, end_date, freq='D') + dates_dict_by_day = {} for d in list(allDates.astype('str')): - group_no = int(d[-2:])%gp_amt + group_no = int(d[-2:]) % date_gp_amt if group_no not in dates_dict_by_day: - dates_dict_by_day[group_no] = [d.replace('-','')] + dates_dict_by_day[group_no] = [d.replace('-', '')] else: - dates_dict_by_day[group_no].append(d.replace('-','')) + dates_dict_by_day[group_no].append(d.replace('-', '')) logger.debug(dates_dict_by_day) - for group_no in dates_dict_by_day: - date_list=dates_dict_by_day[group_no] - num_of_init = len(all_code_dict_by_init) - for ind,code_init in enumerate(all_code_dict_by_init): + date_list = dates_dict_by_day[group_no] + num_of_code_group = len(all_code_dict) + for ind, code_init in enumerate(all_code_dict): # done: 'T','TS','TS','TF' # if code_init in ['T']: # todo filtered this ,,'TF', 'IC','IF','IH','IM' - logger.info(f"Getting {code_init} (no.{ind}/{num_of_init} of group {group_no}/{gp_amt})") - code_list = all_code_dict_by_init[code_init] - if typ=='mink': - # logger.info('Running mink') - run_pool_add_by_datelist_codeinit('mink',date_list,code_list,if_check) - - # run_pool_add_byday_code_init_minKline(date_list,code_list) - elif typ=='tick': - logger.info('Running tick') - run_pool_add_by_datelist_codeinit('tick',date_list,code_list,if_check) - + logger.info( + f"Getting {code_init} (no.{ind}/{num_of_code_group} of date_group {group_no}/{date_gp_amt})") + code_list = all_code_dict[code_init] + if typ == 'mink': + # logger.info('Running mink') + print(date_list) + print(code_list) + run_pool_add_by_datelist_codeinit( + 'mink', date_list, code_list, if_check) + + # run_pool_add_byday_code_init_minKline(date_list,code_list) + elif typ == 'tick': + logger.info('Running tick') + run_pool_add_by_datelist_codeinit( + 'tick', date_list, code_list, if_check) if __name__ == '__main__': @@ -214,13 +210,17 @@ if __name__ == '__main__': tic = time.perf_counter() - typ='mink' - st_d='20221101' - en_d='20221102' + typ = 'mink' + st_d = '20221102' + en_d = '20221103' if_check = 1 + split_code_into_howmany_groups_no = 10 + split_date_into_howmany_groups = 5 - logger.info(f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check} in {running_which_env}, plz check if this info is correct.\n\n\n\n") - run_pool_dates_by_code_init_n_group(typ=typ,gp_amt=3,start_date=st_d,end_date=en_d,if_check=if_check) + logger.info( + f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check} in {running_which_env}, plz check if this info is correct.\n\n\n\n") + run_pool_dates_by_code_init_n_group(typ=typ, code_gp_amt=split_code_into_howmany_groups_no, + date_gp_amt=split_date_into_howmany_groups, start_date=st_d, end_date=en_d, if_check=if_check, code_dict_by='group') # run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5) toc = time.perf_counter() @@ -229,4 +229,4 @@ if __name__ == '__main__': # all t taks Running used 588.5782 seconds for 10 months # 600/60=10 min 12min for take code_init - # 12* 71 = 850 min / 60 = 15 hr for all code for each year \ No newline at end of file + # 12* 71 = 850 min / 60 = 15 hr for all code for each year diff --git a/test/test.dos b/test/test.dos index 74a6315..e4be408 100644 --- a/test/test.dos +++ b/test/test.dos @@ -11,8 +11,9 @@ pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned") select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d -select distinct m_nDatetime.date() from pt where code=`AP2211 +select distinct m_nDatetime.date() from pt where m_nDatetime.date()=2022.11.01d and (code=`AP2211 or code=`AP2212) select distinct code from pt +select distinct code from pt where m_nDatetime.date()=2022.11.01d and code in (`LR2211) select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")