update getting zl and cl data

main^2
yzlocal 2 years ago
parent b94beef451
commit 155851bc18

@ -3000,6 +3000,153 @@
"source": [
"df_list\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from src.code_list_all_pkl import code_list\n",
"code_init = set([c[:-4] for c in code_list])"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"77"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(code_init)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'ap': 'AP',\n",
" 'fg': 'FG',\n",
" 'rm': 'RM',\n",
" 'al': 'al',\n",
" 'rs': 'RS',\n",
" 'pg': 'pg',\n",
" 'jm': 'jm',\n",
" 'wh': 'WH',\n",
" 'er': 'ER',\n",
" 'cs': 'cs',\n",
" 'fb': 'fb',\n",
" 'pk': 'PK',\n",
" 'pp': 'pp',\n",
" 'p': 'p',\n",
" 'ag': 'ag',\n",
" 'pm': 'PM',\n",
" 'ih': 'IH',\n",
" 'wt': 'WT',\n",
" 'ni': 'ni',\n",
" 'j': 'j',\n",
" 'ru': 'ru',\n",
" 'b': 'b',\n",
" 'ta': 'TA',\n",
" 'rr': 'rr',\n",
" 'sa': 'SA',\n",
" 'jr': 'JR',\n",
" 'y': 'y',\n",
" 'jd': 'jd',\n",
" 'lr': 'LR',\n",
" 'sr': 'SR',\n",
" 'm': 'm',\n",
" 'sf': 'SF',\n",
" 'eb': 'eb',\n",
" 'i': 'i',\n",
" 't': 'T',\n",
" 'ur': 'UR',\n",
" 'bu': 'bu',\n",
" 'zn': 'zn',\n",
" 'pf': 'PF',\n",
" 'l': 'l',\n",
" 'ts': 'TS',\n",
" 'ws': 'WS',\n",
" 'zc': 'ZC',\n",
" 'tc': 'TC',\n",
" 'eg': 'eg',\n",
" 'me': 'ME',\n",
" 'a': 'a',\n",
" 'cy': 'CY',\n",
" 'sn': 'sn',\n",
" 'pb': 'pb',\n",
" 'ic': 'IC',\n",
" 'ro': 'RO',\n",
" 'wr': 'wr',\n",
" 'ri': 'RI',\n",
" 'oi': 'OI',\n",
" 'fu': 'fu',\n",
" 'nr': 'nr',\n",
" 'cu': 'cu',\n",
" 'if': 'IF',\n",
" 'rb': 'rb',\n",
" 'bc': 'bc',\n",
" 'cf': 'CF',\n",
" 'sp': 'sp',\n",
" 'c': 'c',\n",
" 'lh': 'lh',\n",
" 'tf': 'TF',\n",
" 'lu': 'lu',\n",
" 'ma': 'MA',\n",
" 'im': 'IM',\n",
" 'hc': 'hc',\n",
" 'sm': 'SM',\n",
" 'bb': 'bb',\n",
" 'cj': 'CJ',\n",
" 'au': 'au',\n",
" 'ss': 'ss',\n",
" 'v': 'v',\n",
" 'sc': 'sc'}"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"code_init_case_mapping = {}\n",
"for c in code_init:\n",
" code_init_case_mapping[c.lower()] = c\n",
"code_init_case_mapping"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"77\n"
]
}
],
"source": [
"from src.code_list_all_pkl import code_init_case_mapping as cp\n",
"print(len(cp))"
]
}
],
"metadata": {

@ -25,9 +25,6 @@ class DDBfm():
all_fm_init=['sc', 'v', 'TS', 'MA', 'AP', 'jm', 'bc', 'bb', 'fu', 'IM', 'IF', 'a', 'lu', 'FG', 'cu', 'al', 'IH', 'RS', 'pg', 'CF', 'SF', 'ni', 'hc', 'UR', 'm', 'SR', 'j', 'PF', 'RM', 'T', 'c', 'JR', 'l', 'p', 'sp', 'CY', 'pb', 'TF', 'b', 'eg', 'rb', 'PK', 'sn', 'nr', 'pp', 'CJ', 'eb', 'SA', 'y', 'RI', 'lh', 'jd', 'OI', 'WH', 'ss', 'ru', 'zn', 'fb', 'rr', 'PM', 'au', 'TA', 'ZC', 'IC', 'bu', 'SM', 'wr', 'cs', 'LR', 'ag', 'i']
ddb_daily_path = "dfs://daily_futuremarket_ts"
ddb_daily_dbname = "db_daily_fm"
ddb_hft_dbPath="dfs://hft_futuremarket_ts"
ddb_hft_dbName="hft_futuremarket_ts"
@ -35,6 +32,7 @@ class DDBfm():
ddf_hft_mink_tbname="MinKlinePartitioned"
ddf_hft_dailydom_tbname="DailyFutureInfoPartitioned"
def __init__(self, which_server='local', pool=False):
self.pool=None
@ -46,6 +44,8 @@ class DDBfm():
if not pool:
self.sess = ddb.session(self.ddb_config['host'], 8848)
self.sess.login(self.ddb_config['username'], self.ddb_config['password'])
if self.sess.existsDatabase(self.ddb_hft_dbPath):
self.hft_db=self.sess.database(dbPath=self.ddb_hft_dbPath)
else:
self.pool = ddb.DBConnectionPool(self.ddb_config['host'], 8848, 12, self.ddb_config['username'], self.ddb_config['password'])
@ -108,20 +108,33 @@ class DDBfm():
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'])
pt.append(t)
def append_hft_table(self, tbName, df):
appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=self.ddb_hft_dbPath)
appender.append(df)
logger.info(f"sucessfully append some df of {df.shape} to {tbName}")
def append_pool_hft_table(self, tbName, df):
appender = ddb.PartitionedTableAppender(self.ddb_hft_dbPath, tbName, 'm_nDatetime', self.pool)
def append_pool_hft_table(self, tbName, df, sort_col='m_nDatetime'):
appender = ddb.PartitionedTableAppender(self.ddb_hft_dbPath, tbName, sort_col, self.pool)
appender.append(df)
logger.info(f"Appending some df of {df.shape} to {tbName}")
def create_daily_info_table(self,df):
tbName=self.ddf_hft_dailydom_tbname
t = self.sess.table(data=df, tableAliasName=tbName)
if self.sess.existsTable(self.ddb_hft_dbPath,tbName):
self.sess.dropTable(self.ddb_hft_dbPath,tbName)
if self.which_server == 'prd':
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init'],sortColumns=["code_init","m_nDate"],compressMethods={"m_nDate":"delta"})
else:
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init'])
pt.append(t)
def search_code_date_in_tb(self,tbName,curr_date,curr_code):
"""
messy.depracated.
"""
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
# print('?did i split this right')
# print(curr_date_formatted)
@ -130,7 +143,7 @@ class DDBfm():
try:
# doing this cuz there's no method to check if a table is empty lol
cond=f"code=`{curr_code}, m_nDatetime.date()={curr_date_formatted}d"
cond=f"code_init=`{curr_code[:2]}, m_nDatetime.date()={curr_date_formatted}d"
cond=f"code_init=`{curr_code[:-4]}, m_nDatetime.date()={curr_date_formatted}d"
# print(cond)
df = tb.select('*').where(cond).top(1).toDF()
if df.empty or df.shape[0]==0:
@ -182,6 +195,29 @@ class DDBfm():
logger.warning(f'{c} on {curr_date} is recorded')
return return_code_list
def get_missing_code_init_date_in_tb(self,tbName,curr_date,code_init_list): # code list has to be like all CF 2022-11-11 fixed it
logger.info(f"Quickly checking if data on {'+'.join(code_init_list)} {curr_date} exists...") # could do a slow checking of num of data
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
try:
df = self.sess.run(f"""pt=loadTable("{self.ddb_hft_dbPath}","{tbName}");
select distinct code_init from pt where m_nDatetime.date()={curr_date_formatted}d and code_init in ({" ".join(["`"+c for c in code_init_list])});
""")
if df.empty or df.shape[0]==0:
return code_init_list
except:
return code_init_list
ddb_code_list = df['distinct_code_init'].to_list()
return_code_list=[]
for c in code_init_list:
if c not in ddb_code_list:
return_code_list.append(c)
else:
logger.warning(f'{c} on {curr_date} is recorded')
return return_code_list
if __name__ == '__main__':
pass

@ -234,6 +234,40 @@ class TSLfm:
return df
def get_zl_code_list(self,date):
scpt="""
zls:= getbk('期货主力');//获取当前所有主力虚拟代码
endt:={date}T;
stocks:=array();
stocksqls:=array();
for i:=0 to length(zls)-1 do
stocks[i]:= ZLToFuturesID(zls[i],endt);//取指定日实际主力合约代码
return stocks; """.format(date=date)
logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default)
if r:
return pd.DataFrame(r.value())
else:
return pd.DataFrame()
def get_cont_code_list(self,date):
scpt="""
zls:= getbk('期货连续');//获取当前所有连续虚拟代码
endt:={date}T;
stocks:=array();
stocksqls:=array();
for i:=0 to length(zls)-1 do
stocks[i]:= ZLToFuturesID(zls[i],endt);//取指定日实际主力合约代码
return stocks; """.format(date=date)
logger.debug(scpt)
r = self.c.exec(scpt,timeout=self.timeout_default)
if r:
return pd.DataFrame(r.value())
else:
return pd.DataFrame()
if __name__ == '__main__':
from loguru import logger
logger.add("../logs/{time:YYYYMMDD-HHmmss}_TSLfm.log", rotation="10 MB", compression="zip", level="INFO")

File diff suppressed because one or more lines are too long

@ -1,4 +1,4 @@
from code_list_all_pkl import code_list as code_list_pickel_from_file
from code_list_all_pkl import code_list as code_list_pickel_from_file, all_fm_init_curr
from TSLfm import TSLfm
from DDBfm import DDBfm
import pandas as pd
@ -36,6 +36,9 @@ def run_add_1day_code_init_minKline(date, code_list):
def check_if_date_codelist_exists(typ, date, code_list):
code_list_filtered=[]
for c in code_list:
if c[:-4] not in all_fm_init_curr: #todo code that's not in curr is not counted!!! len: s6
logger.warning("There's unrecognized code init!!!!")
continue
if c[-4:] >= date[2:6]:
code_list_filtered.append(c)
if not code_list_filtered:
@ -48,7 +51,7 @@ def check_if_date_codelist_exists(typ, date, code_list):
tbName = ddb1.ddf_hft_mink_tbname
code_list_filtered = ddb1.get_missing_code_date_in_tb(
tbName, date, code_list)
tbName, date, code_list_filtered)
if code_list_filtered:
logger.info(
@ -60,6 +63,7 @@ def check_if_date_codelist_exists(typ, date, code_list):
return code_list_filtered
def run_pool_add_by_datelist_codeinit(typ, date_list, code_list, if_check=1):
df_list = []
@ -151,11 +155,14 @@ def run_pool_dates_by_code_init_n_group(typ='mink', code_gp_amt=10, date_gp_amt=
for c in code_list_pickel:
if c[-4:] < start_date[2:6]:
continue
if c[:-4] in all_fm_init_curr: #todo code that's not in curr is not counted!!! len: s6
init = c[:-4]
if init in all_code_dict_by_init:
all_code_dict_by_init[init].append(c)
else:
all_code_dict_by_init[init] = [c]
else:
logger.warning("There's unrecognized code init!!!!")
if code_dict_by == 'init':
all_code_dict = all_code_dict_by_init
@ -227,7 +234,7 @@ if __name__ == '__main__':
typ = 'tick'
st_d = '20211201'
en_d = '20211231'
if_check = 1
if_check = True
split_code_into_howmany_groups_no = 20
split_date_into_howmany_groups = 5

@ -0,0 +1,162 @@
from code_list_all_pkl import code_list as code_list_pickel_from_file, code_init_case_mapping, all_fm_init_curr
from TSLfm import TSLfm
from DDBfm import DDBfm
import pandas as pd
from loguru import logger
from os.path import dirname, abspath, join
import sys
running_which_env = 'prd'
def get_code_init_og_code(code):
if not code:
print('?')
code_init_lower = code[:-4]
return code_init_case_mapping[code_init_lower]+code[-4:]
def process_zl_cl_data(df_zl, df_cl):
df_zl.dropna(inplace=True)
df_zl.rename(columns={0: 'code_dom'}, inplace=True)
df_zl['code_dom'] = df_zl['code_dom'].apply(get_code_init_og_code)
df_zl['code_init'] = df_zl['code_dom'].apply(lambda x: x[:-4])
# print(df_zl)
df_cl.dropna(inplace=True)
df_cl.rename(columns={0: 'code_cont'}, inplace=True)
df_cl['code_cont'] = df_cl['code_cont'].apply(get_code_init_og_code)
df_cl['code_init'] = df_cl['code_cont'].apply(lambda x: x[:-4])
# print(df_cl)
df_cl_new_list = []
for ci, cigp in df_cl.groupby("code_init"):
if ci not in all_fm_init_curr:
logger.warning("There's unrecognized code init!!!!")
continue
cont_ind_list = ['code_cont', 'code_cont1',
'code_cont2', 'code_cont3', 'code_cont4']
df_cont = pd.DataFrame([], columns=['code_init']+cont_ind_list)
df_cont.loc[0, 'code_init'] = ci
all_cont = sorted(list(cigp['code_cont']))
for i, cont in enumerate(all_cont):
df_cont.loc[0, cont_ind_list[i]] = cont
df_cl_new_list.append(df_cont)
df_cl_new = pd.concat(df_cl_new_list)
# print(df_cl_new)
return pd.merge(df_zl, df_cl_new, on='code_init')
def get_zl_cl_df_by_date(start_date, end_date, date_gp_amt=10):
date_list = []
allDates = pd.date_range(start_date, end_date, freq='D')
for d in allDates.astype('str'):
date_list.append(d.replace('-', ''))
df_list = []
date_list_group = {}
for ind, date in enumerate(date_list):
gp_no = int(ind/date_gp_amt)
if gp_no not in date_list_group:
date_list_group[gp_no] = [date]
else:
date_list_group[gp_no].append(date)
print(date_list_group)
for gp_no in date_list_group:
date_list = date_list_group[gp_no]
with TSLfm() as tsl:
for ind, date in enumerate(date_list):
df_zl = tsl.get_zl_code_list(date)
df_cl = tsl.get_cont_code_list(date)
if df_cl.empty and df_zl.empty:
continue
df = process_zl_cl_data(df_zl, df_cl)
df['m_nDate'] = pd.to_datetime(date, format='%Y%m%d')
df_list.append(df)
df_all = pd.concat(df_list)
logger.debug(f'getting zl+cl data of \n{df_all}')
return df_all
def check_if_date_codeinitlist_exists(date, code_init_list):
code_init_list_filtered = []
for ci in code_init_list:
if ci not in all_fm_init_curr: # todo code that's not in curr is not counted!!! len: s6
logger.warning(f"There's unrecognized code init: {ci}!!!!")
continue
code_init_list_filtered.append(ci)
if not code_init_list_filtered:
return code_init_list_filtered
ddb1 = DDBfm(running_which_env)
code_init_list_filtered = ddb1.get_missing_code_init_date_in_tb(
ddb1.ddf_hft_dailydom_tbname, date, code_init_list_filtered)
if code_init_list_filtered:
logger.info(
f"Need to download {'+'.join(code_init_list_filtered)} on {date} in {ddb1.ddf_hft_dailydom_tbname}")
else:
logger.info(
f"all codes checked in database tb {ddb1.ddf_hft_dailydom_tbname} on {date}")
ddb1.close_sess()
del ddb1
return code_init_list_filtered
def run_create_zl_table_in_db(date='20221101'):
logger.info(f"creating zl cl table on {date}")
df = get_zl_cl_df_by_date(date, date)
ddbfm = DDBfm(running_which_env)
ddbfm.create_daily_info_table(df)
def run_pool_append_zl_table_in_db(start_date, end_date, if_check=True):
logger.info(f"Running append zl cl table from {start_date} to {end_date}")
df = get_zl_cl_df_by_date(start_date, end_date)
ddbfm = DDBfm(running_which_env, pool=True)
# for date
# if if_check:
# check_if_date_codeinitlist_exists(date,df['code_init'].tolist())
ddbfm.append_pool_hft_table(
ddbfm.ddf_hft_dailydom_tbname, df, sort_col='m_nDate')
if __name__ == '__main__':
typ = 'dailydom'
st_d = '20220101'
en_d = '20221031'
if_check = False
ROOT_DIR = abspath(join(dirname(abspath(__file__)), ".."))
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}"+f"_{running_which_env}_{typ}_{st_d}_{en_d}_{if_check}.log",
rotation="10 MB", compression="zip", level="DEBUG")
logger.warning(
f"Going to run *{typ}* from {st_d} to {en_d} with if_check dupliactes={if_check} in *{running_which_env}*, plz check if this info is correct.\n\n\n\n")
import time
tic = time.perf_counter()
# run_create_zl_table_in_db()
run_pool_append_zl_table_in_db(
start_date=st_d, end_date=en_d, if_check=if_check)
toc = time.perf_counter()
logger.info(f"Running used {toc - tic:0.4f} seconds")

@ -3,23 +3,29 @@ getDFSDatabases();
getScheduledJobs();
loadTableBySQL(tableName='pt',dbPath="dfs://tsdb",sql="select * limit 10;");
loadTable("dfs://tsdb","pt")
loadTable("dfs://hft_stock_ts","KLinePartitioned")
listTables("dfs:/hft_futuremarket_ts");
pt=loadTable("dfs://hft_futuremarket_ts","MinKlinePartitioned")
pt=loadTable("dfs://hft_futuremarket_ts","TickPartitioned")
pt=loadTable("dfs://hft_futuremarket_ts", "DailyFutureInfoPartitioned")
select top 40 * from pt where code=`T2212 and m_nDatetime.date() =2022.09.28d
select top 4 * from pt where code_init=`T and m_nDatetime.date()=2022.09.28d
select distinct m_nDatetime.date() from pt where m_nDatetime.date()=2022.11.01d and (code=`AP2211 or code=`AP2212)
select distinct code from pt
select distinct code from pt where m_nDatetime.date()=2022.11.01d and code in (`LR2211)
select * from pt where m_nDatetime.date()=2022.11.01d and code in (`al2212)
select count(*) from loadTable("dfs://hft_futuremarket_ts", "MinKlinePartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "TickPartitioned")
select count(*) from loadTable("dfs://hft_futuremarket_ts", "DailyFutureInfoPartitioned")
select top 40 * from pt
pt=loadTable("dfs://daily_stock_ts","daily_kline")
select top 400 * from pt where code_init=`T
schema(pt)
select count(*) from pt
n=1000000
ID=rand(100, n)
@ -36,3 +42,5 @@ pt.append!(t)
pt=loadTable(db,`pt)
select count(x) from pt
db = database("dfs://hft_futuremarket_ts")

Loading…
Cancel
Save