|
|
|
@ -394,6 +394,7 @@ class DDBHFTLoader(DDBLoader):
|
|
|
|
|
row_list = [row for row in row_list
|
|
|
|
|
if pd.to_datetime(row[1]) not in _journal_dt.index]
|
|
|
|
|
print(f"Resume job for {stock_id}, with {len(row_list)} rows left.")
|
|
|
|
|
del(_journal_dt)
|
|
|
|
|
|
|
|
|
|
num_rows = len(row_list)
|
|
|
|
|
# 如果行数为0,则说明是空数据,可以直接返回
|
|
|
|
@ -426,6 +427,7 @@ class DDBHFTLoader(DDBLoader):
|
|
|
|
|
):
|
|
|
|
|
sub_pbar.update()
|
|
|
|
|
|
|
|
|
|
del(row_list)
|
|
|
|
|
self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n")
|
|
|
|
|
self.dump_journal_writer.flush()
|
|
|
|
|
|
|
|
|
@ -477,8 +479,8 @@ class DDBHFTLoader(DDBLoader):
|
|
|
|
|
|
|
|
|
|
ddb_sess.upload({df_table_name : df})
|
|
|
|
|
# 因为在做Tick数据的时候,偶然发生'CHUNK[xxx] does not exist.',所以在这里使用`append!`函数代换一下试试
|
|
|
|
|
ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
|
|
|
|
|
#ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
|
|
|
|
|
#ddb_sess.run("append!(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
|
|
|
|
|
ddb_sess.run("tableInsert(loadTable('{dbPath}', `{partitioned_table_name}), {df_table_name})".format(
|
|
|
|
|
dbPath = DDBHFTLoader.ddb_path,
|
|
|
|
|
partitioned_table_name = type_name + DDBHFTLoader.ddb_partition_table_suffix,
|
|
|
|
|
df_table_name = df_table_name
|
|
|
|
@ -486,5 +488,6 @@ class DDBHFTLoader(DDBLoader):
|
|
|
|
|
|
|
|
|
|
# 由于不是复用`DDBHFTLoader`对象内部的Session,因此如果不手动关闭就会造成内存逐渐泄漏
|
|
|
|
|
ddb_sess.close()
|
|
|
|
|
del(df)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|