main
yz 2 years ago
commit 4d2fb39bd7

@ -3080,6 +3080,153 @@
"source": [ "source": [
"df_list\n" "df_list\n"
] ]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from src.code_list_all_pkl import code_list\n",
"code_init = set([c[:-4] for c in code_list])"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"77"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(code_init)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'ap': 'AP',\n",
" 'fg': 'FG',\n",
" 'rm': 'RM',\n",
" 'al': 'al',\n",
" 'rs': 'RS',\n",
" 'pg': 'pg',\n",
" 'jm': 'jm',\n",
" 'wh': 'WH',\n",
" 'er': 'ER',\n",
" 'cs': 'cs',\n",
" 'fb': 'fb',\n",
" 'pk': 'PK',\n",
" 'pp': 'pp',\n",
" 'p': 'p',\n",
" 'ag': 'ag',\n",
" 'pm': 'PM',\n",
" 'ih': 'IH',\n",
" 'wt': 'WT',\n",
" 'ni': 'ni',\n",
" 'j': 'j',\n",
" 'ru': 'ru',\n",
" 'b': 'b',\n",
" 'ta': 'TA',\n",
" 'rr': 'rr',\n",
" 'sa': 'SA',\n",
" 'jr': 'JR',\n",
" 'y': 'y',\n",
" 'jd': 'jd',\n",
" 'lr': 'LR',\n",
" 'sr': 'SR',\n",
" 'm': 'm',\n",
" 'sf': 'SF',\n",
" 'eb': 'eb',\n",
" 'i': 'i',\n",
" 't': 'T',\n",
" 'ur': 'UR',\n",
" 'bu': 'bu',\n",
" 'zn': 'zn',\n",
" 'pf': 'PF',\n",
" 'l': 'l',\n",
" 'ts': 'TS',\n",
" 'ws': 'WS',\n",
" 'zc': 'ZC',\n",
" 'tc': 'TC',\n",
" 'eg': 'eg',\n",
" 'me': 'ME',\n",
" 'a': 'a',\n",
" 'cy': 'CY',\n",
" 'sn': 'sn',\n",
" 'pb': 'pb',\n",
" 'ic': 'IC',\n",
" 'ro': 'RO',\n",
" 'wr': 'wr',\n",
" 'ri': 'RI',\n",
" 'oi': 'OI',\n",
" 'fu': 'fu',\n",
" 'nr': 'nr',\n",
" 'cu': 'cu',\n",
" 'if': 'IF',\n",
" 'rb': 'rb',\n",
" 'bc': 'bc',\n",
" 'cf': 'CF',\n",
" 'sp': 'sp',\n",
" 'c': 'c',\n",
" 'lh': 'lh',\n",
" 'tf': 'TF',\n",
" 'lu': 'lu',\n",
" 'ma': 'MA',\n",
" 'im': 'IM',\n",
" 'hc': 'hc',\n",
" 'sm': 'SM',\n",
" 'bb': 'bb',\n",
" 'cj': 'CJ',\n",
" 'au': 'au',\n",
" 'ss': 'ss',\n",
" 'v': 'v',\n",
" 'sc': 'sc'}"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"code_init_case_mapping = {}\n",
"for c in code_init:\n",
" code_init_case_mapping[c.lower()] = c\n",
"code_init_case_mapping"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"77\n"
]
}
],
"source": [
"from src.code_list_all_pkl import code_init_case_mapping as cp\n",
"print(len(cp))"
]
} }
], ],
"metadata": { "metadata": {

@ -25,9 +25,6 @@ class DDBfm():
all_fm_init=['sc', 'v', 'TS', 'MA', 'AP', 'jm', 'bc', 'bb', 'fu', 'IM', 'IF', 'a', 'lu', 'FG', 'cu', 'al', 'IH', 'RS', 'pg', 'CF', 'SF', 'ni', 'hc', 'UR', 'm', 'SR', 'j', 'PF', 'RM', 'T', 'c', 'JR', 'l', 'p', 'sp', 'CY', 'pb', 'TF', 'b', 'eg', 'rb', 'PK', 'sn', 'nr', 'pp', 'CJ', 'eb', 'SA', 'y', 'RI', 'lh', 'jd', 'OI', 'WH', 'ss', 'ru', 'zn', 'fb', 'rr', 'PM', 'au', 'TA', 'ZC', 'IC', 'bu', 'SM', 'wr', 'cs', 'LR', 'ag', 'i'] all_fm_init=['sc', 'v', 'TS', 'MA', 'AP', 'jm', 'bc', 'bb', 'fu', 'IM', 'IF', 'a', 'lu', 'FG', 'cu', 'al', 'IH', 'RS', 'pg', 'CF', 'SF', 'ni', 'hc', 'UR', 'm', 'SR', 'j', 'PF', 'RM', 'T', 'c', 'JR', 'l', 'p', 'sp', 'CY', 'pb', 'TF', 'b', 'eg', 'rb', 'PK', 'sn', 'nr', 'pp', 'CJ', 'eb', 'SA', 'y', 'RI', 'lh', 'jd', 'OI', 'WH', 'ss', 'ru', 'zn', 'fb', 'rr', 'PM', 'au', 'TA', 'ZC', 'IC', 'bu', 'SM', 'wr', 'cs', 'LR', 'ag', 'i']
ddb_daily_path = "dfs://daily_futuremarket_ts"
ddb_daily_dbname = "db_daily_fm"
ddb_hft_dbPath="dfs://hft_futuremarket_ts" ddb_hft_dbPath="dfs://hft_futuremarket_ts"
ddb_hft_dbName="hft_futuremarket_ts" ddb_hft_dbName="hft_futuremarket_ts"
@ -35,6 +32,7 @@ class DDBfm():
ddf_hft_mink_tbname="MinKlinePartitioned" ddf_hft_mink_tbname="MinKlinePartitioned"
ddf_hft_dailydom_tbname="DailyFutureInfoPartitioned"
def __init__(self, which_server='local', pool=False): def __init__(self, which_server='local', pool=False):
self.pool=None self.pool=None
@ -46,6 +44,8 @@ class DDBfm():
if not pool: if not pool:
self.sess = ddb.session(self.ddb_config['host'], 8848) self.sess = ddb.session(self.ddb_config['host'], 8848)
self.sess.login(self.ddb_config['username'], self.ddb_config['password']) self.sess.login(self.ddb_config['username'], self.ddb_config['password'])
if self.sess.existsDatabase(self.ddb_hft_dbPath):
self.hft_db=self.sess.database(dbPath=self.ddb_hft_dbPath)
else: else:
self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password']) self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password'])
@ -108,20 +108,33 @@ class DDBfm():
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init']) pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'])
pt.append(t) pt.append(t)
def append_hft_table(self, tbName, df): def append_hft_table(self, tbName, df):
appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=self.ddb_hft_dbPath) appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=self.ddb_hft_dbPath)
appender.append(df) appender.append(df)
logger.info(f"sucessfully append some df of {df.shape} to {tbName}") logger.info(f"sucessfully append some df of {df.shape} to {tbName}")
def append_pool_hft_table(self, tbName, df): def append_pool_hft_table(self, tbName, df, sort_col='m_nDatetime'):
appender = ddb.PartitionedTableAppender(self.ddb_hft_dbPath, tbName, sort_col, self.pool)
appender = ddb.PartitionedTableAppender(self.ddb_hft_dbPath, tbName, 'm_nDatetime', self.pool)
appender.append(df) appender.append(df)
logger.info(f"Appending some df of {df.shape} to {tbName}") logger.info(f"Appending some df of {df.shape} to {tbName}")
def create_daily_info_table(self,df):
tbName=self.ddf_hft_dailydom_tbname
t = self.sess.table(data=df, tableAliasName=tbName)
if self.sess.existsTable(self.ddb_hft_dbPath,tbName):
self.sess.dropTable(self.ddb_hft_dbPath,tbName)
if self.which_server == 'prd':
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init'],sortColumns=["code_init","m_nDate"],compressMethods={"m_nDate":"delta"})
else:
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init'])
pt.append(t)
def search_code_date_in_tb(self,tbName,curr_date,curr_code): def search_code_date_in_tb(self,tbName,curr_date,curr_code):
"""
messy.depracated.
"""
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:] curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
# print('?did i split this right') # print('?did i split this right')
# print(curr_date_formatted) # print(curr_date_formatted)
@ -130,7 +143,7 @@ class DDBfm():
try: try:
# doing this cuz there's no method to check if a table is empty lol # doing this cuz there's no method to check if a table is empty lol
cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d" cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d"
cond=f"code_init=`{curr_code[:2]}, m_nDatetime.date()={curr_date_formatted}d" cond=f"code_init=`{curr_code[:-4]}, m_nDatetime.date()={curr_date_formatted}d"
# print(cond) # print(cond)
df = tb.select('*').where(cond).top(1).toDF() df = tb.select('*').where(cond).top(1).toDF()
if df.empty or df.shape[0]==0: if df.empty or df.shape[0]==0:
@ -182,6 +195,29 @@ class DDBfm():
logger.warning(f'{c} on {curr_date} is recorded') logger.warning(f'{c} on {curr_date} is recorded')
return return_code_list return return_code_list
def get_missing_code_init_date_in_tb(self,tbName,curr_date,code_init_list): # code list has to be like all CF 2022-11-11 fixed it
logger.info(f"Quickly checking if data on {'+'.join(code_init_list)} {curr_date} exists...") # could do a slow checking of num of data
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
try:
df = self.sess.run(f"""pt=loadTable("{self.ddb_hft_dbPath}","{tbName}");
select distinct code_init from pt where m_nDatetime.date()={curr_date_formatted}d and code_init in ({" ".join(["`"+c for c in code_init_list])});
""")
if df.empty or df.shape[0]==0:
return code_init_list
except:
return code_init_list
ddb_code_list = df['distinct_code_init'].to_list()
return_code_list=[]
for c in code_init_list:
if c not in ddb_code_list:
return_code_list.append(c)
else:
logger.warning(f'{c} on {curr_date} is recorded')
return return_code_list
if __name__ == '__main__': if __name__ == '__main__':
pass pass

@ -234,6 +234,40 @@ class TSLfm:
return df return df
def get_zl_code_list(self,date):
scpt="""
zls:= getbk('期货主力');//获取当前所有主力虚拟代码
endt:={date}T;
stocks:=array();
stocksqls:=array();
for i:=0 to length(zls)-1 do
stocks[i]:= ZLToFuturesID(zls[i],endt);//取指定日实际主力合约代码
return stocks; """.format(date=date)
logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default)
if r:
return pd.DataFrame(r.value())
else:
return pd.DataFrame()
def get_cont_code_list(self,date):
scpt="""
zls:= getbk('期货连续');//获取当前所有连续虚拟代码
endt:={date}T;
stocks:=array();
stocksqls:=array();
for i:=0 to length(zls)-1 do
stocks[i]:= ZLToFuturesID(zls[i],endt);//取指定日实际主力合约代码
return stocks; """.format(date=date)
logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default)
if r:
return pd.DataFrame(r.value())
else:
return pd.DataFrame()
if __name__ == '__main__': if __name__ == '__main__':
from loguru import logger from loguru import logger
logger.add("../logs/{time:YYYYMMDD-HHmmss}_TSLfm.log", rotation="10 MB", compression="zip", level="INFO") logger.add("../logs/{time:YYYYMMDD-HHmmss}_TSLfm.log", rotation="10 MB", compression="zip", level="INFO")

File diff suppressed because one or more lines are too long

@ -1,12 +1,13 @@
from code_list_all_pkl import code_list as code_list_pickel_from_file from code_list_all_pkl import code_list as code_list_pickel_from_file, all_fm_init_curr,tsl_ind_codeinit_mapping
from TSLfm import TSLfm from TSLfm import TSLfm
from DDBfm import DDBfm from DDBfm import DDBfm
import pandas as pd import pandas as pd
import numpy as np
from loguru import logger from loguru import logger
from os.path import dirname, abspath, join from os.path import dirname, abspath, join
import sys import sys
running_which_env = 'prd' running_which_env = 'dev'
def run_add_1day_code_init_minKline(date, code_list): def run_add_1day_code_init_minKline(date, code_list):
@ -32,10 +33,19 @@ def run_add_1day_code_init_minKline(date, code_list):
f'Getting a df of {df.shape}: {"+".join(code_list)} on {date}') f'Getting a df of {df.shape}: {"+".join(code_list)} on {date}')
ddb.append_hft_table(ddb.ddf_hft_mink_tbname, df) ddb.append_hft_table(ddb.ddf_hft_mink_tbname, df)
def map_code_tsl_to_9999indcode(code_tsl):
if code_tsl in tsl_ind_codeinit_mapping:
return tsl_ind_codeinit_mapping[code_tsl]+'9999'
else:
logger.error(f'TSL code didnt find mapping for {code_tsl}')
return np.nan
def check_if_date_codelist_exists(typ, date, code_list): def check_if_date_codelist_exists(typ, date, code_list):
code_list_filtered=[] code_list_filtered=[]
for c in code_list: for c in code_list:
if c[:-4] not in all_fm_init_curr: #todo code that's not in curr is not counted!!! len: s6
logger.warning("There's unrecognized code init!!!!")
continue
if c[-4:] >= date[2:6]: if c[-4:] >= date[2:6]:
code_list_filtered.append(c) code_list_filtered.append(c)
if not code_list_filtered: if not code_list_filtered:
@ -44,11 +54,11 @@ def check_if_date_codelist_exists(typ, date, code_list):
if typ == 'tick': if typ == 'tick':
tbName = ddb1.ddf_hft_tick_tbname tbName = ddb1.ddf_hft_tick_tbname
elif typ == 'mink': elif typ in ['mink','indl']:
tbName = ddb1.ddf_hft_mink_tbname tbName = ddb1.ddf_hft_mink_tbname
code_list_filtered = ddb1.get_missing_code_date_in_tb( code_list_filtered = ddb1.get_missing_code_date_in_tb(
tbName, date, code_list) tbName, date, code_list_filtered)
if code_list_filtered: if code_list_filtered:
logger.info( logger.info(
@ -60,15 +70,27 @@ def check_if_date_codelist_exists(typ, date, code_list):
return code_list_filtered return code_list_filtered
def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1): def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1):
df_list = [] df_list = []
if typ=='indl':
code_list_filtered = [] # real code in ddb
code_list_tsl_code=[]
for c in code_list:
code_list_filtered.append(map_code_tsl_to_9999indcode(c))
code_list_tsl_code.append(c)
else:
code_list_filtered= code_list
for date in date_list: for date in date_list:
if if_check: if if_check:
code_list_filtered = check_if_date_codelist_exists( code_list_filtered = check_if_date_codelist_exists(
typ, date, code_list) typ, date, code_list_filtered)
else:
code_list_filtered = code_list
with TSLfm() as tsl: with TSLfm() as tsl:
if typ == 'tick': if typ == 'tick':
df = tsl.process_result_data_type( df = tsl.process_result_data_type(
@ -76,6 +98,15 @@ def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1):
elif typ == 'mink': elif typ == 'mink':
df = tsl.process_result_data_type( df = tsl.process_result_data_type(
tsl.get_mkt_min_k(date, date, code_list_filtered)) tsl.get_mkt_min_k(date, date, code_list_filtered))
elif typ == 'indl':
df = tsl.process_result_data_type(
tsl.get_mkt_min_k(date, date, code_list_tsl_code))
if typ == 'indl' and not df.empty:
# df.rename(columns={'code':'code_tsl'},inplace=True)
df['code']=df['code'].apply(map_code_tsl_to_9999indcode)
df['code_init']=df['code'].apply(lambda x: x[:-4])
# df.drop('code_tsl',axis=1,inplace=True)
if not df.empty: if not df.empty:
df_list.append(df) df_list.append(df)
if not df_list: if not df_list:
@ -87,7 +118,7 @@ def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1):
f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}') f'Getting a df of {df_all.shape}: {code_list[0][:-4]} on {"+".join(date_list)}')
if typ == 'tick': if typ == 'tick':
ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname, df_all) ddb2.append_pool_hft_table(ddb2.ddf_hft_tick_tbname, df_all)
elif typ == 'mink': elif typ in ['mink','indl']:
ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname, df_all) ddb2.append_pool_hft_table(ddb2.ddf_hft_mink_tbname, df_all)
ddb2.close_sess() ddb2.close_sess()
del ddb2 del ddb2
@ -163,17 +194,45 @@ def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt=
else: else:
all_code_dict_by_init[init] = [c] all_code_dict_by_init[init] = [c]
if code_dict_by == 'init': if typ in ['mink','tick']:
all_code_dict = all_code_dict_by_init code_list_pickel=code_list_pickel_from_file
if code_dict_by == 'group': code_list_pickel=sorted(list(set(code_list_pickel)))
all_code_dict_by_group_no = {} all_code_dict_by_init = {}
for ind, code_init in enumerate(sorted(all_code_dict_by_init)): for c in code_list_pickel:
group_no = ind % code_gp_amt if c[-4:] < start_date[2:6]:
if group_no not in all_code_dict_by_group_no: continue
all_code_dict_by_group_no[group_no] = all_code_dict_by_init[code_init] if c[:-4] in all_fm_init_curr: #todo code that's not in curr is not counted!!! len: s6
init = c[:-4]
if init in all_code_dict_by_init:
all_code_dict_by_init[init].append(c)
else:
all_code_dict_by_init[init] = [c]
else: else:
all_code_dict_by_group_no[group_no] += all_code_dict_by_init[code_init] logger.warning("There's unrecognized code init!!!!")
all_code_dict = all_code_dict_by_group_no if code_dict_by == 'init':
all_code_dict = all_code_dict_by_init
if code_dict_by == 'group':
all_code_dict_by_group_no = {}
for ind, code_init in enumerate(sorted(all_code_dict_by_init)):
group_no = ind % code_gp_amt
if group_no not in all_code_dict_by_group_no:
all_code_dict_by_group_no[group_no] = all_code_dict_by_init[code_init]
else:
all_code_dict_by_group_no[group_no] += all_code_dict_by_init[code_init]
all_code_dict = all_code_dict_by_group_no
elif typ in ['indl']:
tsl_ind_code_list = []
real_code_init_list = []
all_code_dict={}
for c in sorted(tsl_ind_codeinit_mapping):
if tsl_ind_codeinit_mapping[c] in all_fm_init_curr:
tsl_ind_code_list.append(c)
real_code_init_list.append(tsl_ind_codeinit_mapping[c])
all_code_dict[tsl_ind_codeinit_mapping[c]]=[c]
else:
logger.warning(f"There's unrecognized code init: {tsl_ind_codeinit_mapping[c]}!!!!")
allDates = pd.date_range(start_date, end_date, freq='D') allDates = pd.date_range(start_date, end_date, freq='D')
dates_dict_by_day = {} dates_dict_by_day = {}
@ -198,16 +257,9 @@ def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt=
code_list = all_code_dict[code_init] code_list = all_code_dict[code_init]
logger.info(date_list) logger.info(date_list)
logger.info(code_list) logger.info(code_list)
if typ == 'mink': run_pool_add_by_datelist_codeinit(
# logger.info('Running mink') typ, date_list, code_list, if_check)
run_pool_add_by_datelist_codeinit(
'mink', date_list, code_list, if_check)
# run_pool_add_byday_code_init_minKline(date_list,code_list)
elif typ == 'tick':
logger.info('Running tick')
run_pool_add_by_datelist_codeinit(
'tick', date_list, code_list, if_check)
if __name__ == '__main__': if __name__ == '__main__':

@ -0,0 +1,173 @@
from code_list_all_pkl import code_list as code_list_pickel_from_file, code_init_case_mapping, all_fm_init_curr
from TSLfm import TSLfm
from DDBfm import DDBfm
import pandas as pd
from loguru import logger
from os.path import dirname, abspath, join
import sys
running_which_env = 'dev'
def get_code_init_og_code(code):
if not code:
print('?')
code_init_lower = code[:-4]
return code_init_case_mapping[code_init_lower]+code[-4:]
def process_zl_cl_data(df_zl, df_cl):
df_zl.dropna(inplace=True)
df_zl.rename(columns={0: 'code_dom'}, inplace=True)
df_zl['code_dom'] = df_zl['code_dom'].apply(get_code_init_og_code)
df_zl['code_init'] = df_zl['code_dom'].apply(lambda x: x[:-4])
print(df_zl)
df_cl.dropna(inplace=True)
df_cl.rename(columns={0: 'code_cont'}, inplace=True)
df_cl['code_cont'] = df_cl['code_cont'].apply(get_code_init_og_code)
df_cl['code_init'] = df_cl['code_cont'].apply(lambda x: x[:-4])
print(df_cl)
df_cl_new_list = []
for ci, cigp in df_cl.groupby("code_init"):
if ci not in all_fm_init_curr:
logger.warning("There's unrecognized code init!!!!")
continue
cont_ind_list = ['code_cont', 'code_cont1',
'code_cont2', 'code_cont3', 'code_cont4']
df_cont = pd.DataFrame([], columns=['code_init']+cont_ind_list)
df_cont.loc[0, 'code_init'] = ci
all_cont = sorted(list(cigp['code_cont']))
for i, cont in enumerate(all_cont):
df_cont.loc[0, cont_ind_list[i]] = cont
df_cl_new_list.append(df_cont)
if not df_cl_new_list:
return pd.DataFrame()
df_cl_new = pd.concat(df_cl_new_list)
# print(df_cl_new)
return pd.merge(df_zl, df_cl_new, on='code_init')
def get_zl_cl_df_by_date(start_date, end_date, date_gp_amt=10):
date_list = []
allDates = pd.date_range(start_date, end_date, freq='D')
for d in allDates.astype('str'):
date_list.append(d.replace('-', ''))
df_list = []
date_list_group = {}
for ind, date in enumerate(date_list):
gp_no = int(ind/date_gp_amt)
if gp_no not in date_list_group:
date_list_group[gp_no] = [date]
else:
date_list_group[gp_no].append(date)
print(date_list_group)
for gp_no in date_list_group:
date_list = date_list_group[gp_no]
with TSLfm() as tsl:
for ind, date in enumerate(date_list):
df_zl = tsl.get_zl_code_list(date)
df_cl = tsl.get_cont_code_list(date)
if df_cl.empty and df_zl.empty:
continue
df = process_zl_cl_data(df_zl, df_cl)
df['m_nDate'] = pd.to_datetime(date, format='%Y%m%d')
df_list.append(df)
df_all = pd.concat(df_list)
logger.debug(f'getting zl+cl data of \n{df_all}')
return df_all
def check_if_date_codeinitlist_exists(date, code_init_list):
code_init_list_filtered = []
for ci in code_init_list:
if ci not in all_fm_init_curr: # todo code that's not in curr is not counted!!! len: s6
logger.warning(f"There's unrecognized code init: {ci}!!!!")
continue
code_init_list_filtered.append(ci)
if not code_init_list_filtered:
return code_init_list_filtered
ddb1 = DDBfm(running_which_env)
code_init_list_filtered = ddb1.get_missing_code_init_date_in_tb(
ddb1.ddf_hft_dailydom_tbname, date, code_init_list_filtered)
if code_init_list_filtered:
logger.info(
f"Need to download {'+'.join(code_init_list_filtered)} on {date} in {ddb1.ddf_hft_dailydom_tbname}")
else:
logger.info(
f"all codes checked in database tb {ddb1.ddf_hft_dailydom_tbname} on {date}")
ddb1.close_sess()
del ddb1
return code_init_list_filtered
def run_create_zl_table_in_db(date='20221101'):
logger.info(f"creating zl cl table on {date}")
df = get_zl_cl_df_by_date(date, date)
ddbfm = DDBfm(running_which_env)
ddbfm.create_daily_info_table(df)
def run_pool_append_zl_table_in_db(start_date, end_date, if_check=True):
logger.info(f"Running append zl cl table from {start_date} to {end_date}")
df = get_zl_cl_df_by_date(start_date, end_date)
ddbfm = DDBfm(running_which_env, pool=True)
# for date
# if if_check:
# check_if_date_codeinitlist_exists(date,df['code_init'].tolist())
ddbfm.append_pool_hft_table(
ddbfm.ddf_hft_dailydom_tbname, df, sort_col='m_nDate')
if __name__ == '__main__':
import time
ROOT_DIR = abspath(join(dirname(abspath(__file__)), ".."))
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}.log",
rotation="10 MB", compression="zip", level="INFO")
for i in range(0,21):
typ = 'dailydom'
# st_d = '20220101'
# en_d = '20221031'
st_d = str(2000+i)+'0101'
en_d = str(2000+i)+'1231'
if_check = False
logger.warning(
f"Going to run *{typ}* from {st_d} to {en_d} with if_check dupliactes={if_check} in *{running_which_env}*, plz check if this info is correct.\n\n\n\n")
tic = time.perf_counter()
# run_create_zl_table_in_db()
run_pool_append_zl_table_in_db(
start_date=st_d, end_date=en_d, if_check=if_check)
toc = time.perf_counter()
logger.info(f"Running used {toc - tic:0.4f} seconds")
time.sleep(10)

@ -3,23 +3,29 @@ getDFSDatabases();
getScheduledJobs(); getScheduledJobs();
loadTableBySQL(tableName='pt',dbPath="dfs://tsdb",sql="select * limit 10;"); loadTableBySQL(tableName='pt',dbPath="dfs://tsdb",sql="select * limit 10;");
loadTable("dfs://tsdb","pt") loadTable("dfs://tsdb","pt")
loadTable("dfs://hft_stock_ts","KLinePartitioned")
listTables("dfs:/hft_futuremarket_ts"); listTables("dfs:/hft_futuremarket_ts");
pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned") pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned")
pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned") pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned")
pt=loadTable("dfs://hft_futuremarket_ts", "DailyFutureInfoPartitioned")
select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d
select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d
select distinct m_nDatetime.date() from pt where m_nDatetime.date()=2022.11.01d and (code=`AP2211 or code=`AP2212) select distinct m_nDatetime.date() from pt where m_nDatetime.date()=2022.11.01d and (code=`AP2211 or code=`AP2212)
select distinct code from pt select distinct code from pt
select distinct code from pt where m_nDatetime.date()=2022.11.01d and code in (`LR2211) select * from pt where m_nDatetime.date()=2022.11.01d and code in (`al2212)
select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned") select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "DailyFutureInfoPartitioned")
select top 40 * from pt pt=loadTable("dfs://daily_stock_ts","daily_kline")
select top 400 * from pt where code=`IC9999
schema(pt) schema(pt)
select count(*) from pt
n=1000000 n=1000000
ID=rand(100, n) ID=rand(100, n)
@ -35,4 +41,6 @@ pt = db.createPartitionedTable(t, `pt, `date`ID)
pt.append!(t) pt.append!(t)
pt=loadTable(db,`pt) pt=loadTable(db,`pt)
select count(x) from pt select count(x) from pt
db = database("dfs://hft_futuremarket_ts")

Loading…
Cancel
Save