updated methods

main
yzlocal 2 years ago
parent 82d3c28895
commit a1e27ed251

@ -39,7 +39,8 @@ class DDBfm():
def __init__(self, which_server='local', **kwargs): def __init__(self, which_server='local', **kwargs):
self.hft_db=None self.hft_db=None
self.ddb_config = self.ddb_config_servers[which_server] self.which_server=which_server
self.ddb_config = self.ddb_config_servers[self.which_server]
self.sess = ddb.session(self.ddb_config['host'], 8848) self.sess = ddb.session(self.ddb_config['host'], 8848)
self.sess.login(self.ddb_config['username'], self.ddb_config['password']) self.sess.login(self.ddb_config['username'], self.ddb_config['password'])
@ -64,32 +65,45 @@ class DDBfm():
if self.sess.existsDatabase(dbPath): if self.sess.existsDatabase(dbPath):
self.sess.dropDatabase(dbPath) self.sess.dropDatabase(dbPath)
if self.which_server == 'prd':
self.hft_db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='TSDB')
else:
self.hft_db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='OLAP') self.hft_db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='OLAP')
# todo: engine=TSDB logger.info(f'Created dbName:{dbName} at dbPath:{dbPath} in {self.which_server}.')
# db = self.sess.database(dbName=dbName, partitionType=keys.COMPO, partitions=[db_date, db_init], dbPath=dbPath, engine='TSDB')
logger.info(f'Created dbName:{dbName} at dbPath:{dbPath}')
def create_hft_table(self, tbName, df): def create_hft_table(self, tbName, df):
t = self.sess.table(data=df, tableAliasName=tbName) t = self.sess.table(data=df, tableAliasName=tbName)
if self.which_server == 'prd':
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'],sortColumns=["code","m_nDatetime"])
else:
pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init']) pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDatetime', 'code_init'])
# pt = self.hft_db.createPartitionedTable(table=t, tableName=tbName, partitionColumns=['m_nDate', 'code_init'],sortColumns=["code","m_nDatetime"])
pt.append(t) pt.append(t)
def append_hft_table(self, dbPath, tbName, df):
# load table to check? time&date&code
appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=dbPath)
appender.append(df)
def append_hft_table(self, tbName, df):
# load table to check? time&date&code
# print(df.shape)
appender = ddb.tableAppender(tableName=tbName, ddbSession=self.sess,dbPath=self.ddb_hft_dbPath)
appender.append(df)
def main(): def search_code_date_in_tb(self,tbName,curr_date,curr_code):
curr_date_formatted = curr_date[:4]+'.'+curr_date[4:6]+'.'+curr_date[6:]
# print('?did i split this right')
# print(curr_date_formatted)
tb = self.sess.loadTable(dbPath=self.ddb_hft_dbPath, tableName=tbName)
try:
# doing this cuz there's no method to check if a table is empty lol
df = tb.select('*').where(f"code=`{curr_code}").where(f"m_nDatetime>={curr_date_formatted}d").top(1).toDF()
print(df)
except:
return 0
return 1
ddb = DDBfm()
ddb.create_ddb_database()
if __name__ == '__main__': if __name__ == '__main__':
main() pass

@ -212,10 +212,13 @@ class TSLfm:
def process_result_data_type(self, r): def process_result_data_type(self, r):
df = pd.DataFrame(r.value()) df = pd.DataFrame(r.value())
if df.empty:
logger.info('No data on this day.')
return 0
logger.info(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}") logger.info(f"Processing new df of shape {df.shape}, which looks like\n{df.head(5)}")
# new = df["m_nDatetime"].str.split(" ", n = 1, expand = True) # new = df["m_nDatetime"].str.split(" ", n = 1, expand = True)
df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64[s]') df["m_nDatetime"]=df["m_nDatetime"].astype('datetime64')
# df['m_nDate']=new[0].astype('datetime64[D]') # df['m_nDate']=new[0].astype('datetime64[D]')
df['code_init']=df['code'].apply(lambda x:x[:-4]) df['code_init']=df['code'].apply(lambda x:x[:-4])

File diff suppressed because one or more lines are too long

@ -1,40 +1,91 @@
import sys import sys
# import os
from loguru import logger running_which_env='prd'
from os.path import dirname, abspath, join from os.path import dirname, abspath, join
ROOT_DIR = abspath(join(dirname(abspath(__file__)), "..")) ROOT_DIR = abspath(join(dirname(abspath(__file__)), ".."))
from loguru import logger from loguru import logger
logger.remove() logger.remove()
logger.add(sys.stderr, level="INFO") logger.add(sys.stderr, level="INFO")
logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}.log", rotation="10 MB", compression="zip", level="INFO") logger.add(ROOT_DIR+"/logs/{time:YYYYMMDD-HHmmss}_{running_which_env}.log", rotation="10 MB", compression="zip", level="INFO")
import pandas as pd
from DDBfm import DDBfm from DDBfm import DDBfm
from TSLfm import TSLfm from TSLfm import TSLfm
from code_list import code_list_pickel
def main(): def run_add_1day_code_init_minKline(date,code_list):
df=None
with TSLfm() as tsl: with TSLfm() as tsl:
# code_list = tsl.get_code_list("国债期货") df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
# code_list += tsl.get_code_list("股指期货") ddb = DDBfm(running_which_env)
# code_list += tsl.get_code_list("上市期货")
# code_list=sorted(list(set(code_list)))
begin_date='20221031'
end_date='20221101'
for code in code_list:
if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname,date,code):
logger.warning(f"Possible duplicates on {date} and {code}")
return 0
ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df)
def run_create_db_minKline():
date = '20221101'
with TSLfm() as tsl:
code_list=['CF2211'] code_list=['CF2211']
df = tsl.process_result_data_type(tsl.get_mkt_min_k(begin_date,end_date,code_list)) df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
print(df) # print(df)
ddb = DDBfm(running_which_env)
ddb = DDBfm('dev')
ddb.create_hft_database() ddb.create_hft_database()
ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df) ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df)
def run():
all_code_dict_by_init={}
for c in code_list_pickel:
init = c[:-4]
if init in all_code_dict_by_init:
all_code_dict_by_init[init].append(c)
else:
all_code_dict_by_init[init]=[c]
# print(all_code_dict_by_init)
start_date='2022-10-31'
end_date='2022-11-08'
allDates = pd.date_range(start_date, end_date, freq ='D')
allDates = [i.replace('-','') for i in list(allDates.astype('str'))]
for date in allDates:
for code_init in all_code_dict_by_init:
code_list = all_code_dict_by_init[code_init]
run_add_1day_code_init_minKline(date,code_list)
# date = '20221101'
# with TSLfm() as tsl:
# # code_list = tsl.get_code_list("国债期货")
# # code_list += tsl.get_code_list("股指期货")
# # code_list += tsl.get_code_list("上市期货")
# # code_list=sorted(list(set(code_list)))
# # print(code_list_pickel)
# code_list=['CF2211']
# df = tsl.process_result_data_type(tsl.get_mkt_min_k(date,date,code_list))
# print(df)
# ddb = DDBfm('prd')
# ddb.create_hft_database()
# ddb.create_hft_table(ddb.ddf_hft_mink_tbname,df)
# if ddb.search_code_date_in_tb(ddb.ddf_hft_mink_tbname,date,'CF2211'):
# logger.warning(f"Possible duplicates on {date} and ")
# ddb.append_hft_table(ddb.ddf_hft_mink_tbname,df)
if __name__ == '__main__': if __name__ == '__main__':
main() run()
Loading…
Cancel
Save