You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

877 lines
30 KiB

#!/usr/bin/env python
"""
Qlib Loader Utility - Load data using the gold-standard handler.yaml configuration.
This module provides a wrapper around qlib's AggHandler that allows specifying
both start and end dates for data loading, unlike the original handler.yaml
which always loads until today.
"""
import os
import sys
import pprint
import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import pandas as pd
from ruamel.yaml import YAML
# NumPy 2.0 compatibility: np.NaN was removed in NumPy 2.0
# This must be set BEFORE importing qlib modules that use np.NaN
import numpy as np
if not hasattr(np, 'NaN'):
np.NaN = np.nan
# Add qlib imports
import qlib
from qlib.utils import (
init_instance_by_config,
fill_placeholder
)
from qlib.contrib.utils import load_placehorder_from_module
# Path to the modified handler.yaml
# qlib_loader.py is at: stock_1d/d033/alpha158_beta/src/qlib_loader.py
# handler.yaml is at: stock_1d/d033/alpha158_beta/config/handler.yaml
CURRENT_DIR = Path(__file__).parent # src/
PROJECT_DIR = CURRENT_DIR.parent # alpha158_beta/
HANDLER_YAML_PATH = PROJECT_DIR / "config" / "handler.yaml"
# Original handler.yaml path (for reference)
ORIGINAL_HANDLER_YAML_PATH = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/handler.yaml"
def load_data_from_handler(
since_date: str,
end_date: str,
buffer_days: int = 20,
yaml_path: Optional[str] = None,
verbose: bool = True
) -> pd.DataFrame:
"""
Load data using qlib's AggHandler with configurable date range.
Args:
since_date: Start date for the data (YYYY-MM-DD or datetime-like)
end_date: End date for the data (YYYY-MM-DD or datetime-like)
buffer_days: Extra days to load before since_date for diff calculations (default: 20)
yaml_path: Path to handler.yaml (default: uses the modified version in config/)
verbose: Print debug information
Returns:
pd.DataFrame: Loaded data with MultiIndex (datetime, instrument)
Notes:
- The buffer_days is needed because the Diff processor calculates
period-over-period changes, which requires looking back in time.
- After loading, you should filter the result to [since_date, end_date]
to remove the buffer period data.
"""
# Resolve yaml path
if yaml_path is None:
yaml_path = HANDLER_YAML_PATH
yaml_path = Path(yaml_path)
if not yaml_path.exists():
raise FileNotFoundError(f"handler.yaml not found at {yaml_path}")
# Convert since_date to datetime if string
if isinstance(since_date, str):
since_date = pd.to_datetime(since_date)
if isinstance(end_date, str):
end_date = pd.to_datetime(end_date)
# Calculate load start (with buffer for diff calculations)
load_start = since_date - pd.Timedelta(days=buffer_days)
if verbose:
print("=" * 60)
print("Loading data from handler.yaml")
print("=" * 60)
print(f" Requested range: {since_date.date()} to {end_date.date()}")
print(f" Buffer days: {buffer_days}")
print(f" Actual load range: {load_start.date()} to {end_date.date()}")
print(f" Handler yaml: {yaml_path}")
# Load yaml config
yaml_loader = YAML(typ='safe', pure=True)
with open(yaml_path) as f:
config = yaml_loader.load(f)
# Initialize qlib
from qlib.workflow.cli import sys_config
config_path = "qlib.contrib.data.config"
sys_config(config, config_path)
qlib.init(**config.get("qlib_init"))
# Prepare placeholder values
placeholder_value = {
"<LOAD_START>": load_start,
"<LOAD_END>": end_date,
}
# Also load placeholders from handler module if available
try:
placeholder_value.update(
load_placehorder_from_module(config["handler"])
)
except Exception as e:
if verbose:
print(f" Note: Could not load placeholders from handler module: {e}")
# Fill placeholders in config
config = fill_placeholder(config, placeholder_value)
if verbose:
print("\nHandler config after filling placeholders:")
pprint.pprint(config)
# Initialize handler and load data
handler = init_instance_by_config(config["handler"])
# Return the underlying data
data = handler._data
if verbose:
# SepDataFrame doesn't have .shape, convert to DataFrame first
if hasattr(data, 'to_frame'):
data_df = data.to_frame() # Convert SepDataFrame to DataFrame
else:
data_df = data
print(f"\nLoaded data shape: {data_df.shape}")
print(f"Data index levels: {data_df.index.names}")
print(f"Data columns: {list(data_df.columns)[:20]}...")
# Filter to requested date range
print(f"\nFiltering to requested range: {since_date.date()} to {end_date.date()}")
# Filter to the requested date range (remove buffer period)
if isinstance(data.index, pd.MultiIndex):
data = data.loc(axis=0)[slice(since_date, end_date), :]
else:
data = data.loc[slice(since_date, end_date)]
if verbose:
# Again handle SepDataFrame
if hasattr(data, 'to_frame'):
data_df = data.to_frame()
else:
data_df = data
print(f"Filtered data shape: {data_df.shape}")
print("=" * 60)
return data
def load_data_with_proc_list(
since_date: str,
end_date: str,
proc_list_path: Optional[str] = None,
buffer_days: int = 20,
yaml_path: Optional[str] = None,
verbose: bool = True
) -> pd.DataFrame:
"""
Load data and apply the preprocessing pipeline (proc_list).
This is the full gold-standard pipeline that produces the exact features
the VAE was trained on.
Args:
since_date: Start date for the data (YYYY-MM-DD)
end_date: End date for the data (YYYY-MM-DD)
proc_list_path: Path to proc_list.proc file
buffer_days: Extra days to load before since_date
yaml_path: Path to handler.yaml
verbose: Print debug information
Returns:
pd.DataFrame: Preprocessed data
"""
import pickle as pkl
from qlib.contrib.data.utils import apply_proc_list
# Default proc_list path
if proc_list_path is None:
proc_list_path = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/proc_list.proc"
if verbose:
print("Step 1: Loading raw data from handler...")
# Load raw data
df = load_data_from_handler(
since_date=since_date,
end_date=end_date,
buffer_days=buffer_days,
yaml_path=yaml_path,
verbose=verbose
)
if verbose:
print("\nStep 2: Loading preprocessing pipeline (proc_list)...")
print(f" Path: {proc_list_path}")
# Load proc_list
with open(proc_list_path, "rb") as f:
proc_list = pkl.load(f)
if verbose:
print(f" Number of processors: {len(proc_list)}")
for i, proc in enumerate(proc_list):
print(f" [{i}] {type(proc).__name__}")
if verbose:
print("\nStep 3: Applying preprocessing pipeline...")
# Apply proc_list
# Note: with_fit=False because we use pre-fitted parameters
df_processed = apply_proc_list(df, proc_list=proc_list, with_fit=False)
if verbose:
print(f"\nProcessed data shape: {df_processed.shape}")
print("=" * 60)
return df_processed
def _fill_con_rating_nan(raw_data, verbose=True):
"""
Fill NaN values in con_rating_strength column before applying proc_list.
The Diff processor creates NaN values, and FlagMarketInjector fails when
trying to convert columns with NaN to int8. This function fills NaN in
con_rating_strength with the column median to avoid IntCastingNaNError.
Args:
raw_data: SepDataFrame or DataFrame with MultiIndex columns
verbose: Print debug info
Returns:
Same type as input, with NaN filled in con_rating_strength
"""
# Check if this is a SepDataFrame (qlib's separated DataFrame)
is_sep = hasattr(raw_data, 'to_frame') and type(raw_data).__name__ == 'SepDataFrame'
# Convert SepDataFrame to DataFrame if needed
if is_sep:
df = raw_data.to_frame()
else:
df = raw_data
# Check if con_rating_strength exists in feature_ext group
target_col = ('feature_ext', 'con_rating_strength')
if target_col in df.columns:
median_val = df[target_col].median()
nan_count = df[target_col].isna().sum()
if verbose:
print(f" Filling {nan_count} NaN values in con_rating_strength with median={median_val:.4f}")
# Create a copy and fill NaN
df = df.copy()
df[target_col] = df[target_col].fillna(median_val)
if verbose:
print(f" Verified: {df[target_col].isna().sum()} NaN remaining")
return df
if verbose:
print(" con_rating_strength not found, skipping NaN fill")
return raw_data
class FixedDiff:
"""
Fixed Diff processor that correctly handles :: separator column format.
The original qlib Diff processor has a bug where it creates column names like:
"('feature_ext', 'log_size')_diff" (string representation of tuple)
Instead of:
'feature_ext::log_size_diff' (proper :: separator format)
"""
def __init__(self, fields_group, suffix="diff", periods=1):
self.fields_group = fields_group
self.suffix = suffix
self.periods = periods
def __call__(self, df):
import pandas as pd
# Get columns for this group - handle :: separator format
cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
df_cols = df[cols]
cols_name = df_cols.columns
# Apply diff transformation
df_cols_diff = df_cols.groupby("instrument").transform(
lambda x: x.ffill().diff(self.periods).fillna(0.)
)
df_cols = pd.concat([df_cols, df_cols_diff], axis=1)
# Create new column names with suffix appended
new_cols = []
for name in cols_name:
new_cols.append(name)
new_cols.append(f"{name}_{self.suffix}")
df_cols.columns = new_cols
df[df_cols.columns] = df_cols
return df
class FixedColumnRemover:
"""Fixed ColumnRemover that handles :: separator format."""
def __init__(self, fields_group):
self.fields_group = fields_group
def __call__(self, df):
cols_to_remove = []
for item in self.fields_group:
if item in df.columns:
cols_to_remove.append(item)
return df.drop(columns=cols_to_remove, errors='ignore')
class FixedFlagToOnehot:
"""Fixed FlagToOnehot that handles :: separator format."""
def __init__(self, fields_group, onehot_group, format_compact=False):
self.fields_group = fields_group
self.onehot_group = onehot_group
self.format_compact = format_compact
def __call__(self, df):
import pandas as pd
cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
for col in cols:
industry_code = col.split('::')[1]
new_col = f"{self.onehot_group}::{industry_code}"
df[new_col] = df[col].astype(int)
return df
class FixedIndusNtrlInjector:
"""Fixed IndusNtrlInjector that handles :: separator format."""
def __init__(self, fields_group, input_group, indus_group,
indus_suffix="_ntrl", ntrl_suffix="_ntrl", keep_origin=True,
include_indus=False, include_indus_std=False, norm_by_ntrl=False):
self.fields_group = fields_group
self.input_group = input_group
self.indus_group = indus_group
self.indus_suffix = indus_suffix
self.ntrl_suffix = ntrl_suffix
self.keep_origin = keep_origin
self.include_indus = include_indus
self.include_indus_std = include_indus_std
self.norm_by_ntrl = norm_by_ntrl
def __call__(self, df):
import pandas as pd
import numpy as np
feature_cols = [c for c in df.columns if c.startswith(f"{self.input_group}::")]
indus_cols = [c for c in df.columns if c.startswith(f"{self.indus_group}::")]
# Get primary industry column (first one with any True values)
indus_assign = None
for ic in indus_cols:
if df[ic].any():
indus_assign = ic
break
if indus_assign is None:
return df
for feat_col in feature_cols:
feat_name = feat_col.split('::')[1]
grouped = df.groupby(indus_assign)[feat_col]
indus_mean = grouped.transform('mean')
indus_std = grouped.transform('std')
ntrl_col = f"{self.input_group}::{feat_name}{self.ntrl_suffix}"
df[ntrl_col] = (df[feat_col] - indus_mean) / indus_std.replace(0, np.nan)
return df
class FixedRobustZScoreNorm:
"""Fixed RobustZScoreNorm that handles :: separator format with trained params."""
def __init__(self, fields_group, mean_train, std_train, clip_outlier=True, cols=None):
self.fields_group = fields_group
self.mean_train = mean_train
self.std_train = std_train
self.clip_outlier = clip_outlier
self.cols = cols
def __call__(self, df):
import pandas as pd
import numpy as np
# Get columns to normalize
if isinstance(self.fields_group, list):
cols_to_norm = []
for grp in self.fields_group:
cols_to_norm.extend([c for c in df.columns if c.startswith(f"{grp}::")])
else:
cols_to_norm = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
# Apply normalization using trained mean/std
if self.mean_train is not None and self.std_train is not None:
for i, col in enumerate(cols_to_norm):
if i < len(self.mean_train) and i < len(self.std_train):
mean_val = self.mean_train[i]
std_val = self.std_train[i]
if std_val > 0:
df[col] = (df[col] - mean_val) / std_val
return df
class FixedFillna:
"""Fixed Fillna that handles :: separator format."""
def __init__(self, fields_group, fill_value=0):
self.fields_group = fields_group
self.fill_value = fill_value
def __call__(self, df):
if isinstance(self.fields_group, list):
cols_to_fill = []
for grp in self.fields_group:
cols_to_fill.extend([c for c in df.columns if c.startswith(f"{grp}::")])
else:
cols_to_fill = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
df[cols_to_fill] = df[cols_to_fill].fillna(self.fill_value)
return df
class FixedFlagMarketInjector:
"""Fixed FlagMarketInjector that handles :: separator format."""
def __init__(self, fields_group, vocab_size=2):
self.fields_group = fields_group
self.vocab_size = vocab_size
def __call__(self, df):
cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
for col in cols:
df[col] = df[col].astype('int8')
return df
class FixedFlagSTInjector:
"""Fixed FlagSTInjector that handles :: separator format."""
def __init__(self, fields_group, st_group="st_flag", col_name="IsST"):
self.fields_group = fields_group
self.st_group = st_group
self.col_name = col_name
def __call__(self, df):
cols = [c for c in df.columns if c.startswith(f"{self.st_group}::")]
for col in cols:
df[col] = df[col].astype('int8')
return df
def convert_columns_to_double_colon(df):
"""
Convert MultiIndex tuple columns to '::' separator string format.
This is needed because the proc_list was trained on data with column names like:
'feature_ext::log_size'
But our loader produces MultiIndex tuples:
('feature_ext', 'log_size')
Args:
df: DataFrame with MultiIndex tuple columns
Returns:
DataFrame with string columns using '::' separator
"""
if not isinstance(df.columns, pd.MultiIndex):
return df
# Create new column names with :: separator
new_columns = [f"{grp}::{col}" for grp, col in df.columns]
df_copy = df.copy()
df_copy.columns = new_columns
return df_copy
def convert_columns_from_double_colon(df):
"""
Convert '::' separator string columns back to MultiIndex tuple format.
Args:
df: DataFrame with '::' separator string columns
Returns:
DataFrame with MultiIndex tuple columns
"""
# Check if columns are strings with :: separator
if not isinstance(df.columns, pd.Index):
return df
# Check if any column contains ::
has_double_colon = any(isinstance(c, str) and '::' in c for c in df.columns)
if not has_double_colon:
return df
# Convert to MultiIndex
new_columns = [tuple(c.split('::', 1)) for c in df.columns]
df_copy = df.copy()
df_copy.columns = pd.MultiIndex.from_tuples(new_columns)
return df_copy
def load_and_dump_data(
since_date: str,
end_date: str,
output_dir: Optional[str] = None,
proc_list_path: Optional[str] = None,
buffer_days: int = 20,
yaml_path: Optional[str] = None,
verbose: bool = True,
skip_proc_list: bool = False,
fill_con_rating_nan: bool = True # New parameter
) -> tuple:
"""
Load data and dump both raw (before proc_list) and processed (after proc_list) versions.
This function saves:
1. Raw data from handler (before applying preprocessing pipeline)
2. Processed data (after applying proc_list) - if not skipped and if successful
Note: The proc_list may fail due to compatibility issues with the data. In this case,
only the raw data will be saved.
Args:
since_date: Start date for the data (YYYY-MM-DD)
end_date: End date for the data (YYYY-MM-DD)
output_dir: Output directory for pickle files (default: data/ folder)
proc_list_path: Path to proc_list.proc file
buffer_days: Extra days to load before since_date
yaml_path: Path to handler.yaml
verbose: Print debug information
skip_proc_list: If True, skip applying proc_list entirely
fill_con_rating_nan: If True, fill NaN in con_rating_strength before proc_list
Returns:
tuple: (raw_df, processed_df or None)
"""
import pickle as pkl
from qlib.contrib.data.utils import apply_proc_list
# Default output directory (data/ folder in project)
if output_dir is None:
output_dir = PROJECT_DIR / "data"
else:
output_dir = Path(output_dir)
# Default proc_list path
if proc_list_path is None:
proc_list_path = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/proc_list.proc"
if verbose:
print("=" * 60)
print("Loading and dumping data")
print("=" * 60)
print(f" Output directory: {output_dir}")
print()
# Ensure output directory exists
output_dir.mkdir(parents=True, exist_ok=True)
# Convert dates
if isinstance(since_date, str):
since_date = pd.to_datetime(since_date)
if isinstance(end_date, str):
end_date = pd.to_datetime(end_date)
# Step 1: Load raw data from handler (with buffer period, NOT filtered)
if verbose:
print("Step 1: Loading raw data from handler (with buffer period)...")
print(f" Requested range: {since_date.date()} to {end_date.date()}")
print(f" Buffer days: {buffer_days}")
# Load yaml config
yaml_path = yaml_path or HANDLER_YAML_PATH
yaml_path = Path(yaml_path)
if not yaml_path.exists():
raise FileNotFoundError(f"handler.yaml not found at {yaml_path}")
yaml_loader = YAML(typ='safe', pure=True)
with open(yaml_path) as f:
config = yaml_loader.load(f)
# Initialize qlib
from qlib.workflow.cli import sys_config
config_path = "qlib.contrib.data.config"
sys_config(config, config_path)
qlib.init(**config.get("qlib_init"))
# Calculate load start (with buffer for diff calculations)
load_start = since_date - pd.Timedelta(days=buffer_days)
# Prepare placeholder values
placeholder_value = {
"<LOAD_START>": load_start,
"<LOAD_END>": end_date,
}
# Load placeholders from handler module
try:
placeholder_value.update(
load_placehorder_from_module(config["handler"])
)
except Exception as e:
if verbose:
print(f" Note: Could not load placeholders from handler module: {e}")
# Fill placeholders in config
config = fill_placeholder(config, placeholder_value)
# Initialize handler and load data
handler = init_instance_by_config(config["handler"])
raw_data = handler._data # Keep as SepDataFrame
if verbose:
if hasattr(raw_data, 'to_frame'):
tmp_df = raw_data.to_frame()
else:
tmp_df = raw_data
print(f" Loaded data shape (with buffer): {tmp_df.shape}")
print(f" Data index levels: {tmp_df.index.names}")
# Step 2: Dump raw data (before proc_list, filtered to requested range)
# Filter first for dumping
if isinstance(raw_data.index, pd.MultiIndex):
raw_data_filtered = raw_data.loc(axis=0)[slice(since_date, end_date), :]
else:
raw_data_filtered = raw_data.loc[slice(since_date, end_date)]
if hasattr(raw_data_filtered, 'to_frame'):
raw_df = raw_data_filtered.to_frame()
else:
raw_df = raw_data_filtered
raw_output_path = output_dir / f"raw_data_{since_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.pkl"
if verbose:
print(f"\nStep 2: Dumping raw data (filtered) to {raw_output_path}...")
print(f" Raw data shape (filtered): {raw_df.shape}")
with open(raw_output_path, "wb") as f:
pkl.dump(raw_df, f)
if verbose:
print(f" Saved: {raw_output_path}")
# Skip proc_list if requested
if skip_proc_list:
if verbose:
print("\nStep 3: Skipping proc_list as requested")
print()
print("=" * 60)
print("Summary:")
print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
print(f" Processed data: SKIPPED")
print("=" * 60)
return raw_df, None
# Step 3: Load preprocessing pipeline (proc_list)
if verbose:
print("\nStep 3: Loading preprocessing pipeline (proc_list)...")
print(f" Path: {proc_list_path}")
with open(proc_list_path, "rb") as f:
proc_list = pkl.load(f)
if verbose:
print(f" Number of processors: {len(proc_list)}")
for i, proc in enumerate(proc_list):
print(f" [{i}] {type(proc).__name__}")
# Step 4: Apply proc_list (BEFORE filtering, on data with buffer period)
if verbose:
print("\nStep 4: Applying preprocessing pipeline (on data with buffer)...")
# Convert SepDataFrame to DataFrame for processing
if hasattr(raw_data, 'to_frame'):
df_for_proc = raw_data.to_frame()
else:
df_for_proc = raw_data.copy()
# Fill NaN in con_rating_strength if requested (workaround for IntCastingNaNError)
if fill_con_rating_nan:
if verbose:
print(" Pre-processing: Filling NaN in con_rating_strength...")
df_for_proc = _fill_con_rating_nan(df_for_proc, verbose=verbose)
# Convert columns from MultiIndex tuples to :: separator format
# The proc_list was trained on data with 'feature_ext::log_size' format
df_for_proc = convert_columns_to_double_colon(df_for_proc)
if verbose:
print(f" Converted columns: {list(df_for_proc.columns[:5])}...")
# Replace Diff processor with FixedDiff - keep all other processors from pickle
# FixedDiff handles the :: separator format and fixes the column naming bug
fixed_proc_list = []
for proc in proc_list:
proc_name = type(proc).__name__
if proc_name == 'Diff':
if verbose:
print(f" Replacing Diff with FixedDiff (fields_group={proc.fields_group})")
fixed_proc = FixedDiff(
fields_group=proc.fields_group,
suffix=proc.suffix,
periods=proc.periods
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'FlagMarketInjector':
if verbose:
print(f" Replacing FlagMarketInjector with FixedFlagMarketInjector")
fixed_proc = FixedFlagMarketInjector(
fields_group=proc.fields_group,
vocab_size=getattr(proc, 'vocab_size', 2)
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'FlagSTInjector':
if verbose:
print(f" Replacing FlagSTInjector with FixedFlagSTInjector")
fixed_proc = FixedFlagSTInjector(
fields_group=proc.fields_group,
st_group=getattr(proc, 'st_group', 'st_flag'),
col_name=getattr(proc, 'col_name', 'IsST')
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'ColumnRemover':
if verbose:
print(f" Replacing ColumnRemover with FixedColumnRemover")
fixed_proc = FixedColumnRemover(fields_group=proc.fields_group)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'FlagToOnehot':
if verbose:
print(f" Replacing FlagToOnehot with FixedFlagToOnehot")
fixed_proc = FixedFlagToOnehot(
fields_group=proc.fields_group,
onehot_group=getattr(proc, 'onehot_group', 'industry')
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'IndusNtrlInjector':
if verbose:
print(f" Replacing IndusNtrlInjector with FixedIndusNtrlInjector (fields_group={proc.fields_group})")
fixed_proc = FixedIndusNtrlInjector(
fields_group=proc.fields_group,
input_group=getattr(proc, 'input_group', proc.fields_group),
indus_group=getattr(proc, 'indus_group', 'indus_flag'),
ntrl_suffix=getattr(proc, 'ntrl_suffix', '_ntrl')
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'RobustZScoreNorm':
if verbose:
print(f" Replacing RobustZScoreNorm with FixedRobustZScoreNorm (using trained mean/std)")
fixed_proc = FixedRobustZScoreNorm(
fields_group=proc.fields_group,
mean_train=getattr(proc, 'mean_train', None),
std_train=getattr(proc, 'std_train', None),
clip_outlier=getattr(proc, 'clip_outlier', True)
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'Fillna':
if verbose:
print(f" Replacing Fillna with FixedFillna")
fixed_proc = FixedFillna(
fields_group=proc.fields_group,
fill_value=getattr(proc, 'fill_value', 0)
)
fixed_proc_list.append(fixed_proc)
else:
# Keep original processor for unknown types
fixed_proc_list.append(proc)
try:
# Apply the fixed proc_list
if verbose:
print("\n Applying fixed preprocessing pipeline...")
processed_data = apply_proc_list(df_for_proc, proc_list=fixed_proc_list, with_fit=False)
if verbose:
print(f" Processed data shape: {processed_data.shape}")
print(f" Processed columns sample: {list(processed_data.columns[:5])}...")
# Convert columns back to MultiIndex tuples for consistency
processed_data = convert_columns_from_double_colon(processed_data)
# Now filter to requested date range
if isinstance(processed_data.index, pd.MultiIndex):
processed_data_filtered = processed_data.loc(axis=0)[slice(since_date, end_date), :]
else:
processed_data_filtered = processed_data.loc[slice(since_date, end_date)]
if verbose:
print(f" Processed data shape (filtered): {processed_data_filtered.shape}")
# Step 5: Dump processed data (after proc_list)
processed_output_path = output_dir / f"processed_data_{since_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.pkl"
if verbose:
print(f"\nStep 5: Dumping processed data to {processed_output_path}...")
with open(processed_output_path, "wb") as f:
pkl.dump(processed_data_filtered, f)
if verbose:
print(f" Saved: {processed_output_path}")
print()
print("=" * 60)
print("Summary:")
print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
print(f" Processed data: {processed_data_filtered.shape} -> {processed_output_path}")
print("=" * 60)
return raw_df, processed_data_filtered
except Exception as e:
if verbose:
print(f"\nERROR applying proc_list: {e}")
print("Only raw data was saved. The proc_list may have compatibility issues")
print("with this data (e.g., NaN values in columns that need int8 conversion).")
print()
print("=" * 60)
print("Summary:")
print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
print(f" Processed data: FAILED ({type(e).__name__})")
print("=" * 60)
return raw_df, None
# Convenience function for quick testing
if __name__ == "__main__":
# Test loading data
test_since = "2019-01-01"
test_end = "2019-01-31"
print(f"Testing data loader with date range: {test_since} to {test_end}")
print()
try:
df = load_data_from_handler(
since_date=test_since,
end_date=test_end,
buffer_days=20,
verbose=True
)
print(f"\nSuccess! Loaded {len(df)} rows")
print(f"Date range in data: {df.index.get_level_values('datetime').min()} to {df.index.get_level_values('datetime').max()}")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()