cleaned up and updated methods

main
yzlocal 2 years ago
parent 4e35f2de8b
commit 5a2439c7f6

@ -50,16 +50,7 @@ class DDBfm():
else: else:
self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password']) self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password'])
def clear_pool(self): # def clear_pool(self):
if self.pool:
while not self.pool.isFinished:
logger.info("pool not finished,sleep for 5 seconds.")
time.sleep(5)
print("pool finished")
logger.debug("pool finished")
self.pool.shutDown()
def close_sess(self):
# if self.pool: # if self.pool:
# while not self.pool.isFinished: # while not self.pool.isFinished:
# logger.info("pool not finished,sleep for 5 seconds.") # logger.info("pool not finished,sleep for 5 seconds.")
@ -68,6 +59,15 @@ class DDBfm():
# logger.debug("pool finished") # logger.debug("pool finished")
# self.pool.shutDown() # self.pool.shutDown()
def close_sess(self):
if self.pool:
while not self.pool.isFinished:
logger.info("pool not finished,sleep for 5 seconds.")
time.sleep(5)
# print("pool finished")
logger.debug("pool finished")
self.pool.shutDown()
if self.sess: if self.sess:
self.sess.clearAllCache() self.sess.clearAllCache()
self.sess.close() self.sess.close()
@ -143,26 +143,37 @@ class DDBfm():
def load_tb(self,tableName): def load_tb(self,tableName):
return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName) return self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tableName)
def get_missing_code_date_in_tb(self,tbName,curr_date,code_list): # code list has to be like all CF def get_missing_code_date_in_tb(self,tbName,curr_date,code_list): # code list has to be like all CF 2022-11-11 fixed it
logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:] curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
# print('?did i split this right') # print('?did i split this right')
# print(curr_date_formatted) # print(curr_date_formatted)
tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName) # tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName)
logger.info(f"Quickly checking if data on {'+'.join(code_list)} {curr_date} exists...") # could do a slow checking of num of data
try: try:
# doing this cuz there's no method to check if a table is empty lol
# cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" df = self.sess.run(f"""pt=loadTable("{self.ddb_hft_dbPath}","{tbName}");
cond=f"code_init=`{code_list[0][:-4]}, m_nDatetime.date()={curr_date_formatted}d" select distinct code from pt where m_nDatetime.date()={curr_date_formatted}d and code in ({" ".join(["`"+c for c in code_list])});
""")
# # doing this cuz there's no method to check if a table is empty lol
# # cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d"
# # cond=f"code_init=`{code_list[0][:-4]}, m_nDatetime.date()={curr_date_formatted}d"
# # code_list_str = " or ".join([f'code=`{c}' for c in code_list])
# cond=f"""`m_nDatetime.date()={curr_date_formatted}d and code in ({" ".join(["`"+c for c in code_list])})"""
# print(cond) # print(cond)
df = tb.select('distinct code').where(cond).toDF() # ddb1.run("""select distinct code from pt where m_nDatetime.date()=2022.11.01d and code in (`AP2305 `AP2310)""")
# df = tb.select('distinct code').where(cond).toDF()
if df.empty or df.shape[0]==0: if df.empty or df.shape[0]==0:
# print(df) # print(df)
return code_list return code_list
except: except:
return code_list return code_list
ddb_code_list = df['distinct_code'].to_list() ddb_code_list = df['distinct_code'].to_list()
print(ddb_code_list) # print(ddb_code_list)
return_code_list=[] return_code_list=[]
for c in code_list: for c in code_list:
if c not in ddb_code_list: if c not in ddb_code_list:

@ -1,87 +1,60 @@
from code_list import code_list_pickel
from TSLfm import TSLfm
from DDBfm import DDBfm
import pandas as pd
from loguru import logger
from os.path import dirname, abspath, join
import sys import sys
running_which_env='dev' running_which_env = 'prd'
from os.path import dirname, abspath, join
ROOT_DIR = abspath(join(dirname(abspath(__file__)), "..")) ROOT_DIR = abspath(join(dirname(abspath(__file__)), ".."))
from loguru import logger
logger.remove() logger.remove()
logger.add(sys.stderr, level="INFO") logger.add(sys.stderr, level="INFO")
logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log", rotation="10 MB", compression="zip", level="DEBUG") logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log",
rotation="10 MB", compression="zip", level="DEBUG")
import pandas as pd
from DDBfm import DDBfm def run_add_1day_code_init_minKline(date, code_list):
from TSLfm import TSLfm """
too slow. depracated.
from code_list import code_list_pickel """
def run_add_1day_code_init_minKline(date,code_list):
ddb = DDBfm(running_which_env) ddb = DDBfm(running_which_env)
code_list_filtered = [] code_list_filtered = []
for code in code_list: for code in code_list:
if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname,date,code): if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname, date, code):
logger.warning(f"Possible duplicates on {date} and {code}") logger.warning(f"Possible duplicates on {date} and {code}")
else: else:
code_list_filtered.append(code) code_list_filtered.append(code)
if len(code_list_filtered)==0: if len(code_list_filtered) == 0:
return 0 return 0
with TSLfm() as tsl: with TSLfm() as tsl:
df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list_filtered)) df = tsl.process_result_data_type(
tsl.get_mkt_min_k(date, date, code_list_filtered))
if not df.empty: if not df.empty:
logger.info(f'Getting a df of {df.shape}: {code_list[0][:-4]} on {date}') logger.info(
ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df) f'Getting a df of {df.shape}: {code_list[0][:-4]} on {date}')
ddb.append_hft_table(ddb.ddf_hft_mink_tbname, df)
# def run_pool_add_byday_code_init_minKline(date_list,code_list):
# df_list=[]
# code_list_filtered=code_list def check_if_date_codelist_exists(typ, date, code_list):
# ddb1 = DDBfm(running_which_env) code_list_filtered = code_list
# tb=ddb1.load_tb(tableName=ddb1.ddf_hft_mink_tbname)
# # tb=ddb1.sess.loadTable(dbPath=ddb1.ddb_hft_dbPath, tableName=ddb1.ddf_hft_mink_tbname)
# for date in date_list:
# with TSLfm() as tsl:
# df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
# if df.empty:
# continue
# code_list_filtered = ddb1.get_missing_code_date_in_tb(tb,date,code_list)
# if len(code_list_filtered)==0:
# continue
# logger.info(f"getting {'+'.join(code_list_filtered)} on {date}")
# df=df[df['code'].isin(code_list_filtered)]
# df_list.append(df)
# ddb1.close_sess()
# del ddb1
# if df_list:
# df_all = pd.concat(df_list)
# ddb2 = DDBfm(running_which_env,pool=True)
# logger.info(f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}')
# ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname,df_all)
# ddb2.clear_pool()
# del ddb2
def check_if_date_codelist_exists(typ,date,code_list):
code_list_filtered=code_list
ddb1 = DDBfm(running_which_env) ddb1 = DDBfm(running_which_env)
if typ=='tick': if typ == 'tick':
tbName = ddb1.ddf_hft_tick_tbname tbName = ddb1.ddf_hft_tick_tbname
elif typ=='mink': elif typ == 'mink':
tbName = ddb1.ddf_hft_mink_tbname tbName = ddb1.ddf_hft_mink_tbname
code_list_filtered = ddb1.get_missing_code_date_in_tb(tbName,date,code_list) code_list_filtered = ddb1.get_missing_code_date_in_tb(
tbName, date, code_list)
if code_list_filtered: if code_list_filtered:
logger.info(f"Need to download {'+'.join(code_list_filtered)} on {date} in {tbName}") logger.info(
f"Need to download {'+'.join(code_list_filtered)} on {date} in {tbName}")
else: else:
logger.info(f"all codes checked in database {tbName} on {date}") logger.info(f"all codes checked in database {tbName} on {date}")
ddb1.close_sess() ddb1.close_sess()
@ -89,122 +62,145 @@ def check_if_date_codelist_exists(typ,date,code_list):
return code_list_filtered return code_list_filtered
def run_pool_add_by_datelist_codeinit(typ,date_list,code_list,if_check=1): def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1):
df_list=[] df_list = []
for date in date_list: for date in date_list:
if if_check: if if_check:
code_list_filtered = check_if_date_codelist_exists(typ,date,code_list) code_list_filtered = check_if_date_codelist_exists(
typ, date, code_list)
else: else:
code_list_filtered = code_list code_list_filtered = code_list
with TSLfm() as tsl: with TSLfm() as tsl:
if typ == 'tick': if typ == 'tick':
df = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list_filtered)) df = tsl.process_result_data_type(
tsl.get_trade_tick(date, date, code_list_filtered))
elif typ == 'mink': elif typ == 'mink':
df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list_filtered)) df = tsl.process_result_data_type(
tsl.get_mkt_min_k(date, date, code_list_filtered))
if not df.empty: if not df.empty:
df_list.append(df) df_list.append(df)
if not df_list: if not df_list:
return 0 return 0
df_all = pd.concat(df_list) df_all = pd.concat(df_list)
ddb2 = DDBfm(running_which_env,pool=True) ddb2 = DDBfm(running_which_env, pool=True)
logger.info(f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}') logger.info(
ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname,df_all) f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}')
if typ == 'tick':
ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname, df_all)
elif typ == 'mink':
ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname, df_all)
ddb2.close_sess() ddb2.close_sess()
del ddb2 del ddb2
def run_create_hft_db(date = '20221101'): def run_create_hft_db(date='20221101', if_mink=1, if_tick=1):
code_list = ['T2212']
ddb = DDBfm(running_which_env) ddb = DDBfm(running_which_env)
ddb.create_hft_database() ddb.create_hft_database()
if if_mink:
logger.info(f"creating mink on {date} for {code_list[0]}")
with TSLfm() as tsl: with TSLfm() as tsl:
code_list=['T2212'] df_mink = tsl.process_result_data_type(
df_mink = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list)) tsl.get_mkt_min_k(date, date, code_list))
# print(df) # print(df)
ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df_mink) ddb.create_hft_table(ddb.ddf_hft_mink_tbname, df_mink)
if if_tick:
logger.info(f"creating tick on {date} for {code_list[0]}")
with TSLfm() as tsl: with TSLfm() as tsl:
code_list=['T2212'] df_tick = tsl.process_result_data_type(
df_tick = tsl.process_result_data_type(tsl.get_trade_tick(date,date,code_list)) tsl.get_trade_tick(date, date, code_list))
# print(df) # print(df)
ddb.create_hft_table(ddb.ddf_hft_tick_tbname,df_tick) ddb.create_hft_table(ddb.ddf_hft_tick_tbname, df_tick)
def run(): def run():
"""
all_code_dict_by_init={} too slow. depracated.
"""
all_code_dict_by_init = {}
for c in code_list_pickel: for c in code_list_pickel:
init = c[:-4] init = c[:-4]
if init in all_code_dict_by_init: if init in all_code_dict_by_init:
all_code_dict_by_init[init].append(c) all_code_dict_by_init[init].append(c)
else: else:
all_code_dict_by_init[init]=[c] all_code_dict_by_init[init] = [c]
# print(all_code_dict_by_init) # print(all_code_dict_by_init)
start_date='2022-09-30' start_date = '2022-09-30'
end_date='2022-10-31' end_date = '2022-10-31'
allDates = pd.date_range(start_date, end_date, freq ='D') allDates = pd.date_range(start_date, end_date, freq='D')
allDates = [i.replace('-','') for i in list(allDates.astype('str'))] allDates = [i.replace('-', '') for i in list(allDates.astype('str'))]
for date in allDates: for date in allDates:
for ind,code_init in enumerate(all_code_dict_by_init): for ind, code_init in enumerate(all_code_dict_by_init):
logger.info(f"Getting {code_init} (no.{ind})") logger.info(f"Getting {code_init} (no.{ind})")
code_list = all_code_dict_by_init[code_init] code_list = all_code_dict_by_init[code_init]
run_add_1day_code_init_minKline(date,code_list) run_add_1day_code_init_minKline(date, code_list)
def run_pool_dates_by_code_init_n_group(typ='mink',gp_amt=10,start_date='20220101',end_date='20221031',if_check=1): def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt=10, start_date='20220101', end_date='20221031', if_check=1, code_dict_by='init'):
logger.info("Running run_pool_dates_by_group") logger.info("Running run_pool_dates_by_group")
all_code_dict_by_init={}
all_code_dict_by_init = {}
for c in code_list_pickel: for c in code_list_pickel:
init = c[:-4] init = c[:-4]
if init in all_code_dict_by_init: if init in all_code_dict_by_init:
all_code_dict_by_init[init].append(c) all_code_dict_by_init[init].append(c)
else: else:
all_code_dict_by_init[init]=[c] all_code_dict_by_init[init] = [c]
# print(all_code_dict_by_init) if code_dict_by == 'init':
all_code_dict = all_code_dict_by_init
if code_dict_by == 'group':
all_code_dict_by_group_no = {}
for ind, code_init in enumerate(sorted(all_code_dict_by_init)):
group_no = ind % code_gp_amt
if group_no not in all_code_dict_by_group_no:
all_code_dict_by_group_no[group_no] = all_code_dict_by_init[code_init]
else:
all_code_dict_by_group_no[group_no] += all_code_dict_by_init[code_init]
all_code_dict = all_code_dict_by_group_no
allDates = pd.date_range(start_date, end_date, freq ='D') allDates = pd.date_range(start_date, end_date, freq='D')
dates_dict_by_day={} dates_dict_by_day = {}
for d in list(allDates.astype('str')): for d in list(allDates.astype('str')):
group_no = int(d[-2:])%gp_amt group_no = int(d[-2:]) % date_gp_amt
if group_no not in dates_dict_by_day: if group_no not in dates_dict_by_day:
dates_dict_by_day[group_no] = [d.replace('-','')] dates_dict_by_day[group_no] = [d.replace('-', '')]
else: else:
dates_dict_by_day[group_no].append(d.replace('-','')) dates_dict_by_day[group_no].append(d.replace('-', ''))
logger.debug(dates_dict_by_day) logger.debug(dates_dict_by_day)
for group_no in dates_dict_by_day: for group_no in dates_dict_by_day:
date_list=dates_dict_by_day[group_no] date_list = dates_dict_by_day[group_no]
num_of_init = len(all_code_dict_by_init) num_of_code_group = len(all_code_dict)
for ind,code_init in enumerate(all_code_dict_by_init): for ind, code_init in enumerate(all_code_dict):
# done: 'T','TS','TS','TF' # done: 'T','TS','TS','TF'
# if code_init in ['T']: # todo filtered this ,,'TF', 'IC','IF','IH','IM' # if code_init in ['T']: # todo filtered this ,,'TF', 'IC','IF','IH','IM'
logger.info(f"Getting {code_init} (no.{ind}/{num_of_init} of group {group_no}/{gp_amt})") logger.info(
code_list = all_code_dict_by_init[code_init] f"Getting {code_init} (no.{ind}/{num_of_code_group} of date_group {group_no}/{date_gp_amt})")
if typ=='mink': code_list = all_code_dict[code_init]
if typ == 'mink':
# logger.info('Running mink') # logger.info('Running mink')
run_pool_add_by_datelist_codeinit('mink',date_list,code_list,if_check) print(date_list)
print(code_list)
run_pool_add_by_datelist_codeinit(
'mink', date_list, code_list, if_check)
# run_pool_add_byday_code_init_minKline(date_list,code_list) # run_pool_add_byday_code_init_minKline(date_list,code_list)
elif typ=='tick': elif typ == 'tick':
logger.info('Running tick') logger.info('Running tick')
run_pool_add_by_datelist_codeinit('tick',date_list,code_list,if_check) run_pool_add_by_datelist_codeinit(
'tick', date_list, code_list, if_check)
if __name__ == '__main__': if __name__ == '__main__':
@ -214,13 +210,17 @@ if __name__ == '__main__':
tic = time.perf_counter() tic = time.perf_counter()
typ='mink' typ = 'mink'
st_d='20221101' st_d = '20221102'
en_d='20221102' en_d = '20221103'
if_check = 1 if_check = 1
split_code_into_howmany_groups_no = 10
split_date_into_howmany_groups = 5
logger.info(f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check} in {running_which_env}, plz check if this info is correct.\n\n\n\n") logger.info(
run_pool_dates_by_code_init_n_group(typ=typ,gp_amt=3,start_date=st_d,end_date=en_d,if_check=if_check) f"Going to run {typ} from {st_d} to {en_d} with if_check dupliactes={if_check} in {running_which_env}, plz check if this info is correct.\n\n\n\n")
run_pool_dates_by_code_init_n_group(typ=typ, code_gp_amt=split_code_into_howmany_groups_no,
date_gp_amt=split_date_into_howmany_groups, start_date=st_d, end_date=en_d, if_check=if_check, code_dict_by='group')
# run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5) # run_pool_dates_by_code_init_n_group(typ='mink',group_amount=5)
toc = time.perf_counter() toc = time.perf_counter()

@ -11,8 +11,9 @@ pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned")
select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d
select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d
select distinct m_nDatetime.date() from pt where code=`AP2211 select distinct m_nDatetime.date() from pt where m_nDatetime.date()=2022.11.01d and (code=`AP2211 or code=`AP2212)
select distinct code from pt select distinct code from pt
select distinct code from pt where m_nDatetime.date()=2022.11.01d and code in (`LR2211)
select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")

Loading…
Cancel
Save