From c79fb69145450d7950a95e0f6d2af8d75d7e5a7f Mon Sep 17 00:00:00 2001 From: yzlocal Date: Fri, 18 Nov 2022 09:16:21 +0800 Subject: [PATCH] update getting index line --- src/code_list_all_pkl.py | 2 +- src/data_loader.py | 124 +++++++++++++++++++++++++-------------- src/data_loader_zl.py | 49 ++++++++++------ test/test.dos | 2 +- 4 files changed, 111 insertions(+), 66 deletions(-) diff --git a/src/code_list_all_pkl.py b/src/code_list_all_pkl.py index b2a9c8b..61600ec 100644 --- a/src/code_list_all_pkl.py +++ b/src/code_list_all_pkl.py @@ -1,5 +1,5 @@ all_fm_init_curr=['sc', 'v', 'TS', 'MA', 'AP', 'jm', 'bc', 'bb', 'fu', 'IM', 'IF', 'a', 'lu', 'FG', 'cu', 'al', 'IH', 'RS', 'pg', 'CF', 'SF', 'ni', 'hc', 'UR', 'm', 'SR', 'j', 'PF', 'RM', 'T', 'c', 'JR', 'l', 'p', 'sp', 'CY', 'pb', 'TF', 'b', 'eg', 'rb', 'PK', 'sn', 'nr', 'pp', 'CJ', 'eb', 'SA', 'y', 'RI', 'lh', 'jd', 'OI', 'WH', 'ss', 'ru', 'zn', 'fb', 'rr', 'PM', 'au', 'TA', 'ZC', 'IC', 'bu', 'SM', 'wr', 'cs', 'LR', 'ag', 'i'] - +tsl_ind_codeinit_mapping={'ICInd': 'IC', 'IFInd': 'IF', 'IHInd': 'IH', 'IMInd': 'IM', 'TInd': 'T', 'TFInd': 'TF', 'TSInd': 'TS', 'QI000001': 'al', 'QI000002': 'au', 'QI000003': 'cu', 'QI000004': 'fu', 'QI000005': 'ru', 'QI000006': 'zn', 'QI000007': 'CF', 'QI000009': 'RO', 'QI000010': 'SR', 'QI000011': 'TA', 'QI000012': 'WS', 'QI000013': 'WT', 'QI000014': 'a', 'QI000015': 'b', 'QI000016': 'c', 'QI000017': 'l', 'QI000018': 'm', 'QI000019': 'p', 'QI000020': 'y', 'QI000021': 'ER', 'QI000022': 'wr', 'QI000023': 'rb', 'QI000024': 'v', 'QI000025': 'pb', 'QI000026': 'j', 'QI000027': 'ME', 'QI000028': 'ag', 'QI000029': 'FG', 'QI000030': 'RS', 'QI000031': 'RM', 'QI000032': 'jm', 'QI000033': 'bu', 'QI000034': 'i', 'QI000035': 'ZC', 'QI000036': 'fb', 'QI000037': 'bb', 'QI000038': 'JR', 'QI000039': 'jd', 'QI000040': 'hc', 'QI000041': 'pp', 'QI000042': 'LR', 'QI000043': 'SF', 'QI000044': 'SM', 'QI000045': 'cs', 'QI000046': 'ni', 'QI000047': 'sn', 'QI000048': 'CY', 'QI000049': 'AP', 'QI000050': 'sc', 'QI000051': 'sp', 'QI000052': 'eg', 'QI000053': 'CJ', 'QI000054': 'nr', 'QI000055': 'UR', 'QI000056': 'rr', 'QI000057': 'ss', 'QI000058': 'eb', 'QI000059': 'SA', 'QI000060': 'pg', 'QI000061': 'lu', 'QI000062': 'PF', 'QI000063': 'bc', 'QI000064': 'lh', 'QI000065': 'PK'} code_init_case_mapping={'ap': 'AP', 'fg': 'FG', 'rm': 'RM', diff --git a/src/data_loader.py b/src/data_loader.py index 92e931a..8d153fb 100644 --- a/src/data_loader.py +++ b/src/data_loader.py @@ -1,12 +1,13 @@ -from code_list_all_pkl import code_list as code_list_pickel_from_file, all_fm_init_curr +from code_list_all_pkl import code_list as code_list_pickel_from_file, all_fm_init_curr,tsl_ind_codeinit_mapping from TSLfm import TSLfm from DDBfm import DDBfm import pandas as pd +import numpy as np from loguru import logger from os.path import dirname, abspath, join import sys -running_which_env = 'prd' +running_which_env = 'dev' def run_add_1day_code_init_minKline(date, code_list): @@ -32,6 +33,12 @@ def run_add_1day_code_init_minKline(date, code_list): f'Getting a df of {df.shape}: {"+".join(code_list)} on {date}') ddb.append_hft_table(ddb.ddf_hft_mink_tbname, df) +def map_code_tsl_to_9999indcode(code_tsl): + if code_tsl in tsl_ind_codeinit_mapping: + return tsl_ind_codeinit_mapping[code_tsl]+'9999' + else: + logger.error(f'TSL code didnt find mapping for {code_tsl}') + return np.nan def check_if_date_codelist_exists(typ, date, code_list): code_list_filtered=[] @@ -47,7 +54,7 @@ def check_if_date_codelist_exists(typ, date, code_list): if typ == 'tick': tbName = ddb1.ddf_hft_tick_tbname - elif typ == 'mink': + elif typ in ['mink','indl']: tbName = ddb1.ddf_hft_mink_tbname code_list_filtered = ddb1.get_missing_code_date_in_tb( @@ -64,15 +71,26 @@ def check_if_date_codelist_exists(typ, date, code_list): + + def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1): df_list = [] + if typ=='indl': + code_list_filtered = [] # real code in ddb + code_list_tsl_code=[] + for c in code_list: + code_list_filtered.append(map_code_tsl_to_9999indcode(c)) + code_list_tsl_code.append(c) + + else: + code_list_filtered= code_list for date in date_list: if if_check: code_list_filtered = check_if_date_codelist_exists( - typ, date, code_list) - else: - code_list_filtered = code_list + typ, date, code_list_filtered) + + with TSLfm() as tsl: if typ == 'tick': df = tsl.process_result_data_type( @@ -80,6 +98,15 @@ def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1): elif typ == 'mink': df = tsl.process_result_data_type( tsl.get_mkt_min_k(date, date, code_list_filtered)) + elif typ == 'indl': + df = tsl.process_result_data_type( + tsl.get_mkt_min_k(date, date, code_list_tsl_code)) + + if typ == 'indl' and not df.empty: + # df.rename(columns={'code':'code_tsl'},inplace=True) + df['code']=df['code'].apply(map_code_tsl_to_9999indcode) + df['code_init']=df['code'].apply(lambda x: x[:-4]) + # df.drop('code_tsl',axis=1,inplace=True) if not df.empty: df_list.append(df) if not df_list: @@ -91,7 +118,7 @@ def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1): f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}') if typ == 'tick': ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname, df_all) - elif typ == 'mink': + elif typ in ['mink','indl']: ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname, df_all) ddb2.close_sess() del ddb2 @@ -149,32 +176,46 @@ def run(): def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt=10, start_date='20220101', end_date='20221031', if_check=1, code_dict_by='init'): logger.info("Running run_pool_dates_by_group") - code_list_pickel=code_list_pickel_from_file - code_list_pickel=sorted(list(set(code_list_pickel))) - all_code_dict_by_init = {} - for c in code_list_pickel: - if c[-4:] < start_date[2:6]: - continue - if c[:-4] in all_fm_init_curr: #todo code that's not in curr is not counted!!! len: s6 - init = c[:-4] - if init in all_code_dict_by_init: - all_code_dict_by_init[init].append(c) - else: - all_code_dict_by_init[init] = [c] - else: - logger.warning("There's unrecognized code init!!!!") - if code_dict_by == 'init': - all_code_dict = all_code_dict_by_init - if code_dict_by == 'group': - all_code_dict_by_group_no = {} - for ind, code_init in enumerate(sorted(all_code_dict_by_init)): - group_no = ind % code_gp_amt - if group_no not in all_code_dict_by_group_no: - all_code_dict_by_group_no[group_no] = all_code_dict_by_init[code_init] + if typ in ['mink','tick']: + code_list_pickel=code_list_pickel_from_file + code_list_pickel=sorted(list(set(code_list_pickel))) + all_code_dict_by_init = {} + for c in code_list_pickel: + if c[-4:] < start_date[2:6]: + continue + if c[:-4] in all_fm_init_curr: #todo code that's not in curr is not counted!!! len: s6 + init = c[:-4] + if init in all_code_dict_by_init: + all_code_dict_by_init[init].append(c) + else: + all_code_dict_by_init[init] = [c] else: - all_code_dict_by_group_no[group_no] += all_code_dict_by_init[code_init] - all_code_dict = all_code_dict_by_group_no + logger.warning("There's unrecognized code init!!!!") + if code_dict_by == 'init': + all_code_dict = all_code_dict_by_init + if code_dict_by == 'group': + all_code_dict_by_group_no = {} + for ind, code_init in enumerate(sorted(all_code_dict_by_init)): + group_no = ind % code_gp_amt + if group_no not in all_code_dict_by_group_no: + all_code_dict_by_group_no[group_no] = all_code_dict_by_init[code_init] + else: + all_code_dict_by_group_no[group_no] += all_code_dict_by_init[code_init] + all_code_dict = all_code_dict_by_group_no + + elif typ in ['indl']: + tsl_ind_code_list = [] + real_code_init_list = [] + all_code_dict={} + for c in sorted(tsl_ind_codeinit_mapping): + if tsl_ind_codeinit_mapping[c] in all_fm_init_curr: + tsl_ind_code_list.append(c) + real_code_init_list.append(tsl_ind_codeinit_mapping[c]) + all_code_dict[tsl_ind_codeinit_mapping[c]]=[c] + else: + logger.warning(f"There's unrecognized code init: {tsl_ind_codeinit_mapping[c]}!!!!") + allDates = pd.date_range(start_date, end_date, freq='D') dates_dict_by_day = {} @@ -199,16 +240,9 @@ def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt= code_list = all_code_dict[code_init] logger.info(date_list) logger.info(code_list) - if typ == 'mink': - # logger.info('Running mink') - run_pool_add_by_datelist_codeinit( - 'mink', date_list, code_list, if_check) + run_pool_add_by_datelist_codeinit( + typ, date_list, code_list, if_check) - # run_pool_add_byday_code_init_minKline(date_list,code_list) - elif typ == 'tick': - logger.info('Running tick') - run_pool_add_by_datelist_codeinit( - 'tick', date_list, code_list, if_check) if __name__ == '__main__': @@ -231,12 +265,12 @@ if __name__ == '__main__': split_code_into_howmany_groups_no = int(split_code_into_howmany_groups_no) split_date_into_howmany_groups=int(split_date_into_howmany_groups) else: - typ = 'tick' - st_d = '20211201' - en_d = '20211231' + typ = 'indl' + st_d = '20221101' + en_d = '20221101' if_check = True - split_code_into_howmany_groups_no = 20 - split_date_into_howmany_groups = 5 + split_code_into_howmany_groups_no = 1 # how many in 1 gp + split_date_into_howmany_groups = 1 logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}_{typ}_{st_d}_{en_d}_{if_check}_{split_code_into_howmany_groups_no}_{split_date_into_howmany_groups}.log", diff --git a/src/data_loader_zl.py b/src/data_loader_zl.py index 44f2d9a..1e18987 100644 --- a/src/data_loader_zl.py +++ b/src/data_loader_zl.py @@ -6,7 +6,7 @@ from loguru import logger from os.path import dirname, abspath, join import sys -running_which_env = 'prd' +running_which_env = 'dev' def get_code_init_og_code(code): @@ -21,13 +21,13 @@ def process_zl_cl_data(df_zl, df_cl): df_zl.rename(columns={0: 'code_dom'}, inplace=True) df_zl['code_dom'] = df_zl['code_dom'].apply(get_code_init_og_code) df_zl['code_init'] = df_zl['code_dom'].apply(lambda x: x[:-4]) - # print(df_zl) + print(df_zl) df_cl.dropna(inplace=True) df_cl.rename(columns={0: 'code_cont'}, inplace=True) df_cl['code_cont'] = df_cl['code_cont'].apply(get_code_init_og_code) df_cl['code_init'] = df_cl['code_cont'].apply(lambda x: x[:-4]) - # print(df_cl) + print(df_cl) df_cl_new_list = [] for ci, cigp in df_cl.groupby("code_init"): @@ -44,6 +44,8 @@ def process_zl_cl_data(df_zl, df_cl): for i, cont in enumerate(all_cont): df_cont.loc[0, cont_ind_list[i]] = cont df_cl_new_list.append(df_cont) + if not df_cl_new_list: + return pd.DataFrame() df_cl_new = pd.concat(df_cl_new_list) # print(df_cl_new) return pd.merge(df_zl, df_cl_new, on='code_init') @@ -137,26 +139,35 @@ def run_pool_append_zl_table_in_db(start_date, end_date, if_check=True): if __name__ == '__main__': - - typ = 'dailydom' - st_d = '20220101' - en_d = '20221031' - if_check = False + import time ROOT_DIR = abspath(join(dirname(abspath(__file__)), "..")) logger.remove() logger.add(sys.stderr, level="INFO") - logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}_{typ}_{st_d}_{en_d}_{if_check}.log", - rotation="10 MB", compression="zip", level="DEBUG") + logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log", + rotation="10 MB", compression="zip", level="INFO") + + for i in range(0,21): + - logger.warning( - f"Going to run *{typ}* from {st_d} to {en_d} with if_check dupliactes={if_check} in *{running_which_env}*, plz check if this info is correct.\n\n\n\n") + typ = 'dailydom' + # st_d = '20220101' + # en_d = '20221031' + st_d = str(2000+i)+'0101' + en_d = str(2000+i)+'1231' + + if_check = False - import time - tic = time.perf_counter() + logger.warning( + f"Going to run *{typ}* from {st_d} to {en_d} with if_check dupliactes={if_check} in *{running_which_env}*, plz check if this info is correct.\n\n\n\n") + + + tic = time.perf_counter() + + # run_create_zl_table_in_db() + run_pool_append_zl_table_in_db( + start_date=st_d, end_date=en_d, if_check=if_check) - # run_create_zl_table_in_db() - run_pool_append_zl_table_in_db( - start_date=st_d, end_date=en_d, if_check=if_check) + toc = time.perf_counter() + logger.info(f"Running used {toc - tic:0.4f} seconds") - toc = time.perf_counter() - logger.info(f"Running used {toc - tic:0.4f} seconds") + time.sleep(10) \ No newline at end of file diff --git a/test/test.dos b/test/test.dos index 752b836..c79b362 100644 --- a/test/test.dos +++ b/test/test.dos @@ -23,7 +23,7 @@ select count(*) from loadTable("dfs://hft_futuremarket_ts", "DailyFutureInfoPart pt=loadTable("dfs://daily_stock_ts","daily_kline") -select top 400 * from pt where code_init=`T +select top 400 * from pt where code=`IC9999 schema(pt) select count(*) from pt