import time from tqdm import tqdm import itertools from datetime import datetime import pandas as pd from pprint import pprint import pyTSL TINYSOFT_HOSTNAME = 'tsl.tinysoft.com.cn' TINYSOFT_PORT = 443 TINYSOFT_USERNAME = 'xunuo2005' TINYSOFT_PASSWORD = '20220613' class tsl(): def __enter__(self): self.c = pyTSL.Client(TINYSOFT_USERNAME, TINYSOFT_PASSWORD, TINYSOFT_HOSTNAME, 443) self.c.login() return self def __exit__(self, exc_type, exc_val, exc_tb): self.c.logout() del(self.c) def get_stock_list(self, date=None, bk_name='A股'): if date is None: code = """ return getbk('A股;暂停上市;终止上市'); """ else: code = """ bkName := "%s"; endT := %d; return getAbkbydate(bkName,endT); """ % (bk_name, date) r = self.c.exec(code) return r.value() def get_cmp_report(self, table_id, stock_id, start_date): code = """ SetSysParam('ReportMode', -1); return Select * from infotable {table_id} of '{stock_id}' where ['截止日'] >= {start_date} end; """.format( table_id=table_id, stock_id=stock_id, start_date=start_date ) r = self.c.exec(code) df = pd.DataFrame(r.value()) return df def get_cmp_indicator(self, stock_id, start_year, indicator_config_fname): index_list = ['StockID', '报告期'] end_year = datetime.now().year + 1 year_list = range(start_year, end_year, 1) quarter_list = ['0331', '0630', '0930', '1231'] report_date_list = [str(t[0]) + t[1] for t in itertools.product(year_list, quarter_list)] config_df = pd.read_csv(indicator_config_fname) config_df['indicator_id'] = config_df['indicator_id'].astype('string') indicator_dict = config_df.set_index('indicator_id')['indicator_name'].to_dict() indicator_id_list = list(indicator_dict.keys()) indicator_name_list = ["'%s'" % indicator_dict[indicator_id].strip() for indicator_id in indicator_id_list] code = """ SetSysParam('ReportMode', -1); SetSysParam(pn_reportmode(), -1); SetSysParam(pn_stock(), '{stock_id}'); date_array := Array({date_list_input}); indicator_array := Array({indicator_id_list_input}); indicator_name_array := Array({indicator_name_list_input}); r := Array(); for i:=0 to length(date_array)-1 do begin r[i]['StockID'] := '{stock_id}'; r[i]['报告期'] := date_array[i]; for j:=0 to length(indicator_array)-1 do begin r[i][indicator_name_array[j]] := ReportOfAll(indicator_array[j], date_array[i]); end end return r; """.format( stock_id=stock_id, date_list_input=', '.join(report_date_list), indicator_id_list_input=', '.join(indicator_id_list), indicator_name_list_input=', '.join(indicator_name_list) ) r = self.c.exec(code) r = r.value() df = pd.DataFrame(r, index=range(len(r))) df.set_index(index_list, inplace=True) return df def get_mkt_trading_days(self, start_date, end_date): # TradeDays: # nday3: 返回用第二个参数计算N日的一维数字下标数组。 # SpecDate: 临时修改当前时间的快捷方法 # Spec: 指定StockID股票为当前股票,计算表达式的值返回 code = """ begt:=inttodate(%d); endt:=inttodate(%d); return spec(specdate(nday3(tradedays(begt,endt),DateToInt(sp_time())),endt),'SH000001'); """ % (start_date, end_date); r = self.c.exec(code) return r.value() def get_mkt_stock_k_1min(self, date, stock_list): stock_list_input = ', '.join(['\'%s\'' % stock_id for stock_id in stock_list]) code = """ SetSysParam(pn_cycle(), cy_1m()); stock_list := Array({stock_list}); r := select ['StockID'], DateTimeToStr(['date']) as 'time', ['open'], ['high'], ['low'], ['close'], ['vol'], ['amount'], ['yclose'] as 'AdjPreClose', (['close']/['yclose'] - 1) * 100 as 'PctChg' from markettable datekey {start_date}T to {end_date}T+0.999 of stock_list end; return r; """.format( stock_list=stock_list_input, start_date=date, end_date=date ) r = self.c.exec(code) return r def get_mkt_stock_k_daily(self, start_date, end_date, stock_id): # vol: 成交量 # amount: 成交金额 # cjbs: 成交笔数 # yclose: 上次价 code = """ SetSysParam(pn_cycle(),cy_day()); return select ['StockID'], datetoint(['date']) as 'date', ['open'], ['high'], ['low'], ['close'], ['vol'], ['amount'], ['cjbs'], ['yclose'] from markettable datekey {start_date}T to {end_date}T+0.999 of \'{stock_id}\' end; """.format( stock_id=stock_id, start_date=start_date, end_date=end_date ) r = self.c.exec(code) df = pd.DataFrame(r.value()) if 'StockID' in df.columns and 'date' in df.columns: df.set_index(['StockID', 'date'], inplace=True) return df, r def get_mkt_stock_k_daily_ext(self, start_date, end_date, stock_id): date_list = self.get_mkt_trading_days(start_date, end_date) # IsSt_3: 是否是ST(指定日),判断当前股票在指定日是否是ST股。函数主要是通过名称变更进行判断。 # Stockzf3:涨幅(%) =(收盘-昨收)/昨收*100%(已复权) # http://www.tinysoft.com.cn/tsdn/helpdoc/display.tsl?id=12582 # AuthorityFactor: 以上市日为复权基准日,向后复权,得到指定日比例复权方式的复权因子值,即复权方式为1。调用该函数需注意设置当前股票和当前时间。 code = """ SetSysParam(pn_cycle(),cy_day()); SetSysParam(pn_stock(), \'{stock_id}\'); date_array := Array({date_list_input}); r := Array(); for i:=0 to length(date_array)-1 do begin SetSysParam(PN_Date(), inttodate(date_array[i])); r[i]['StockID'] := \'{stock_id}\'; r[i]['date'] := date_array[i]; r[i]['PctChg'] := StockZf3(); r[i]['IsST'] := IsST_3(date_array[i]); r[i]['FloatShares'] := StockNegotiableShares(date_array[i]); r[i]['Factor'] := AuthorityFactor(); end; return r; """.format( stock_id=stock_id, date_list_input=', '.join([str(date) for date in date_list]) ) r = self.c.exec(code) df = pd.DataFrame(r.value(), index=range(len(r.value()))) if 'StockID' in df.columns and 'date' in df.columns: df.set_index(['StockID', 'date'], inplace=True) return df, r def get_fund_info(self, table_id, fund_id): code = """ SetSysParam('ReportMode', -1); return Select * from infotable {table_id} of '{fund_id}' end; """.format( table_id=table_id, fund_id=fund_id ) r = self.c.exec(code) df = pd.DataFrame(r.value()) return df def get_fund_list(self, date): pass