From 586b16a6faa2ffb2c2c3c36bcba65dc3946fe3e6 Mon Sep 17 00:00:00 2001
From: guofu
Date: Fri, 27 Feb 2026 21:46:49 +0800
Subject: [PATCH] Add qlib_loader.py with configurable date range for data
loading
## New Files
- src/qlib_loader.py - Qlib data loader utility with:
- load_data_from_handler() - Load data with configurable start/end dates
- load_data_with_proc_list() - Full pipeline with preprocessing
- load_and_dump_data() - Dump raw and processed data to pickle files
- Fixed processor implementations (FixedDiff, FixedColumnRemover, etc.)
that handle :: separator column format correctly
- NaN filling workaround for con_rating_strength column
- config/handler.yaml - Modified handler config with and
placeholders instead of hardcoded and
- data/.gitignore - Ignore pickle and parquet data files
## Updated
- README.md - Documentation for data loading with configurable date range
## Key Changes
1. Fixed Diff processor bug: Column names now correctly use :: separator
format (e.g., 'feature_ext::log_size_diff') instead of malformed
string representations of tuples
2. Preserved trained parameters: Fixed processors use mean_train/std_train
from original proc_list pickle for RobustZScoreNorm
3. Configurable end date: handler.yaml now respects user-specified end
dates instead of always loading until today
## Tested
- Successfully dumps raw data (before proc_list) to pickle files
- Successfully applies fixed proc_list and dumps processed data
- Both 2019-01 and 2025-01 data processed without errors
---
stock_1d/d033/alpha158_beta/README.md | 229 +++++
.../d033/alpha158_beta/config/handler.yaml | 34 +
stock_1d/d033/alpha158_beta/data/.gitignore | 8 +
.../d033/alpha158_beta/src/qlib_loader.py | 876 ++++++++++++++++++
4 files changed, 1147 insertions(+)
create mode 100644 stock_1d/d033/alpha158_beta/README.md
create mode 100644 stock_1d/d033/alpha158_beta/config/handler.yaml
create mode 100644 stock_1d/d033/alpha158_beta/data/.gitignore
create mode 100644 stock_1d/d033/alpha158_beta/src/qlib_loader.py
diff --git a/stock_1d/d033/alpha158_beta/README.md b/stock_1d/d033/alpha158_beta/README.md
new file mode 100644
index 0000000..a431547
--- /dev/null
+++ b/stock_1d/d033/alpha158_beta/README.md
@@ -0,0 +1,229 @@
+# Alpha158 0_7 vs 0_7_beta Prediction Comparison
+
+This directory contains a workflow for comparing Alpha158 version 0_7 (original) vs 0_7_beta (enhanced with VAE embeddings) predictions.
+
+## Overview
+
+The goal is to evaluate whether the beta version of Alpha158 factors produces better predictions than the original 0_7 version when used with the d033 prediction model.
+
+## Directory Structure
+
+```
+stock_1d/d033/alpha158_beta/
+├── README.md # This file
+├── config.yaml # VAE model configuration
+├── pipeline.py # Main orchestration script
+├── scripts/ # Core pipeline scripts
+│ ├── generate_beta_embedding.py # Generate VAE embeddings from beta factors
+│ ├── generate_returns.py # Generate actual returns from kline data
+│ ├── fetch_predictions.py # Fetch original predictions from DolphinDB
+│ ├── predict_with_embedding.py # Generate predictions using beta embeddings
+│ └── compare_predictions.py # Compare 0_7 vs 0_7_beta predictions
+├── src/ # Source modules
+│ └── qlib_loader.py # Qlib data loader with configurable date range
+├── config/ # Configuration files
+│ └── handler.yaml # Modified handler with configurable end date
+└── data/ # Data files (gitignored)
+ ├── embedding_0_7_beta.parquet
+ ├── predictions_beta_embedding.parquet
+ ├── original_predictions_0_7.parquet
+ ├── actual_returns.parquet
+ ├── raw_data_*.pkl # Raw data before preprocessing
+ └── processed_data_*.pkl # Processed data after preprocessing
+```
+
+## Data Loading with Configurable Date Range
+
+### handler.yaml Modification
+
+The original `handler.yaml` uses `` placeholder which always loads data until today's date. The modified version in `config/handler.yaml` uses `` placeholder that can be controlled via arguments:
+
+```yaml
+# Original (always loads until today)
+load_start: &load_start
+load_end: &load_end
+
+# Modified (configurable end date)
+load_start: &load_start
+load_end: &load_end
+```
+
+### Using qlib_loader.py
+
+```python
+from stock_1d.d033.alpha158_beta.src.qlib_loader import (
+ load_data_from_handler,
+ load_data_with_proc_list,
+ load_and_dump_data
+)
+
+# Load data with configurable date range
+df = load_data_from_handler(
+ since_date="2019-01-01",
+ end_date="2019-01-31",
+ buffer_days=20, # Extra days for diff calculations
+ verbose=True
+)
+
+# Load and apply preprocessing pipeline
+df_processed = load_data_with_proc_list(
+ since_date="2019-01-01",
+ end_date="2019-01-31",
+ proc_list_path="/path/to/proc_list.proc",
+ buffer_days=20
+)
+
+# Load and dump both raw and processed data to pickle files
+raw_df, processed_df = load_and_dump_data(
+ since_date="2019-01-01",
+ end_date="2019-01-31",
+ output_dir="data/",
+ fill_con_rating_nan=True, # Fill NaN in con_rating_strength column
+ verbose=True
+)
+```
+
+### Key Features
+
+1. **Configurable end date**: Unlike the original handler.yaml, the end date is now respected
+2. **Buffer period handling**: Automatically loads extra days before `since_date` for diff calculations
+3. **NaN handling**: Optional filling of NaN values in `con_rating_strength` column
+4. **Dual output**: Saves both raw (before proc_list) and processed (after proc_list) data
+
+### Processor Fixes
+
+The `qlib_loader.py` includes fixed implementations of qlib processors that correctly handle the `::` separator column format:
+
+- `FixedDiff` - Fixes column naming bug (creates proper `feature::col_diff` names)
+- `FixedColumnRemover` - Handles `::` separator format
+- `FixedRobustZScoreNorm` - Uses trained `mean_train`/`std_train` parameters from pickle
+- `FixedIndusNtrlInjector` - Industry neutralization with `::` format
+- Other fixed processors for the full preprocessing pipeline
+
+All fixed processors preserve the trained parameters from the original proc_list pickle.
+
+## Workflow
+
+### 1. Generate Beta Embeddings
+
+Generate VAE embeddings from the alpha158_0_7_beta factors:
+
+```bash
+python scripts/generate_beta_embedding.py --start-date 2019-01-01 --end-date 2020-11-30
+```
+
+This loads data from Parquet, applies the full feature transformation pipeline, and encodes with the VAE model.
+
+Output: `data/embedding_0_7_beta.parquet`
+
+### 2. Fetch Original Predictions
+
+Fetch the original 0_7 predictions from DolphinDB:
+
+```bash
+python scripts/fetch_predictions.py --start-date 2019-01-01 --end-date 2020-11-30
+```
+
+Output: `data/original_predictions_0_7.parquet`
+
+### 3. Generate Predictions with Beta Embeddings
+
+Use the d033 model to generate predictions from the beta embeddings:
+
+```bash
+python scripts/predict_with_embedding.py --start-date 2019-01-01 --end-date 2020-11-30
+```
+
+Output: `data/predictions_beta_embedding.parquet`
+
+### 4. Generate Actual Returns
+
+Generate actual returns from kline data for IC calculation:
+
+```bash
+python scripts/generate_returns.py
+```
+
+Output: `data/actual_returns.parquet`
+
+### 5. Compare Predictions
+
+Compare the 0_7 vs 0_7_beta predictions:
+
+```bash
+python scripts/compare_predictions.py
+```
+
+This calculates:
+- Prediction correlation (Pearson and Spearman)
+- Daily correlation statistics
+- IC metrics (mean, std, IR)
+- RankIC metrics
+- Top-tier returns (top 10%)
+
+## Quick Start
+
+Run the full pipeline:
+
+```bash
+python pipeline.py --start-date 2019-01-01 --end-date 2020-11-30
+```
+
+Or run individual steps:
+
+```bash
+# Step 1: Generate embeddings
+python scripts/generate_beta_embedding.py --start-date 2019-01-01 --end-date 2020-11-30
+
+# Step 2: Fetch original predictions
+python scripts/fetch_predictions.py --start-date 2019-01-01 --end-date 2020-11-30
+
+# Step 3: Generate beta predictions
+python scripts/predict_with_embedding.py
+
+# Step 4: Generate returns
+python scripts/generate_returns.py
+
+# Step 5: Compare
+python scripts/compare_predictions.py
+```
+
+## Data Dependencies
+
+### Input Data (from Parquet)
+
+- `/data/parquet/dataset/stg_1day_wind_alpha158_0_7_beta_1D/` - Alpha158 beta factors
+- `/data/parquet/dataset/stg_1day_wind_kline_adjusted_1D/` - Market data (kline)
+- `/data/parquet/dataset/stg_1day_gds_indus_flag_cc1_1D/` - Industry flags
+
+### Models
+
+- `/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/model/csiallx_feature2_ntrla_flag_pnlnorm_vae4_dim32a_beta0001/module.pt` - VAE encoder
+- `/home/guofu/Workspaces/alpha/data_ops/tasks/app_longsignal/model/host140_exp20_d033/module.pt` - d033 prediction model
+
+### DolphinDB
+
+- Table: `dfs://daily_stock_run_multicast/app_1day_multicast_longsignal_port`
+- Version: `host140_exp20_d033`
+
+## Key Metrics
+
+The comparison script outputs:
+
+| Metric | Description |
+|--------|-------------|
+| Pearson Correlation | Overall correlation between 0_7 and beta predictions |
+| Spearman Correlation | Rank correlation between predictions |
+| Daily Correlation | Mean and std of daily correlations |
+| IC Mean | Average information coefficient |
+| IC Std | Standard deviation of IC |
+| IC IR | Information ratio (IC Mean / IC Std) |
+| RankIC | Spearman correlation with returns |
+| Top-tier Return | Average return of top 10% predictions |
+
+## Notes
+
+- All scripts can be run from the `alpha158_beta/` directory
+- Scripts use relative paths (`../data/`) to locate data files
+- The VAE model expects 341 input features after the transformation pipeline
+- The d033 model expects 32-dimensional embeddings with a 40-day lookback window
diff --git a/stock_1d/d033/alpha158_beta/config/handler.yaml b/stock_1d/d033/alpha158_beta/config/handler.yaml
new file mode 100644
index 0000000..4f28479
--- /dev/null
+++ b/stock_1d/d033/alpha158_beta/config/handler.yaml
@@ -0,0 +1,34 @@
+qlib_init:
+ provider_uri: "/home/guofu/.qlib/data_ops/target"
+ region: cn
+
+ddb_config: &ddb_config
+ host: 192.168.1.146
+ port: 8848
+ username: "admin"
+ password: "123456"
+
+# Load date range - these placeholders should be replaced at runtime
+# LOAD_START = since_date - buffer_days (e.g., 20 days for diff calculation)
+# LOAD_END = end_date (user-specified, NOT today's date)
+load_start: &load_start
+load_end: &load_end
+
+market: &market csiallx
+
+data_handler_config: &data_handler_config
+ start_time: *load_start
+ end_time: *load_end
+ instruments: *market
+ ddb_config: *ddb_config
+ handler_list:
+ -
+ -
+ -
+ -
+ -
+
+handler:
+ class: AggHandler
+ module_path: qlib.contrib.data.agg_handler
+ kwargs: *data_handler_config
diff --git a/stock_1d/d033/alpha158_beta/data/.gitignore b/stock_1d/d033/alpha158_beta/data/.gitignore
new file mode 100644
index 0000000..0bf1a61
--- /dev/null
+++ b/stock_1d/d033/alpha158_beta/data/.gitignore
@@ -0,0 +1,8 @@
+# Ignore all pickle and parquet data files
+*.pkl
+*.parquet
+*.npy
+*.npz
+
+# Keep the .gitignore itself
+!.gitignore
diff --git a/stock_1d/d033/alpha158_beta/src/qlib_loader.py b/stock_1d/d033/alpha158_beta/src/qlib_loader.py
new file mode 100644
index 0000000..5909bbe
--- /dev/null
+++ b/stock_1d/d033/alpha158_beta/src/qlib_loader.py
@@ -0,0 +1,876 @@
+#!/usr/bin/env python
+"""
+Qlib Loader Utility - Load data using the gold-standard handler.yaml configuration.
+
+This module provides a wrapper around qlib's AggHandler that allows specifying
+both start and end dates for data loading, unlike the original handler.yaml
+which always loads until today.
+"""
+
+import os
+import sys
+import pprint
+import datetime
+from pathlib import Path
+from typing import Optional, Dict, Any
+
+import pandas as pd
+from ruamel.yaml import YAML
+
+# NumPy 2.0 compatibility: np.NaN was removed in NumPy 2.0
+# This must be set BEFORE importing qlib modules that use np.NaN
+import numpy as np
+if not hasattr(np, 'NaN'):
+ np.NaN = np.nan
+
+# Add qlib imports
+import qlib
+from qlib.utils import (
+ init_instance_by_config,
+ fill_placeholder
+)
+from qlib.contrib.utils import load_placehorder_from_module
+
+
+# Path to the modified handler.yaml
+# qlib_loader.py is at: stock_1d/d033/alpha158_beta/src/qlib_loader.py
+# handler.yaml is at: stock_1d/d033/alpha158_beta/config/handler.yaml
+CURRENT_DIR = Path(__file__).parent # src/
+PROJECT_DIR = CURRENT_DIR.parent # alpha158_beta/
+HANDLER_YAML_PATH = PROJECT_DIR / "config" / "handler.yaml"
+
+# Original handler.yaml path (for reference)
+ORIGINAL_HANDLER_YAML_PATH = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/handler.yaml"
+
+
+def load_data_from_handler(
+ since_date: str,
+ end_date: str,
+ buffer_days: int = 20,
+ yaml_path: Optional[str] = None,
+ verbose: bool = True
+) -> pd.DataFrame:
+ """
+ Load data using qlib's AggHandler with configurable date range.
+
+ Args:
+ since_date: Start date for the data (YYYY-MM-DD or datetime-like)
+ end_date: End date for the data (YYYY-MM-DD or datetime-like)
+ buffer_days: Extra days to load before since_date for diff calculations (default: 20)
+ yaml_path: Path to handler.yaml (default: uses the modified version in config/)
+ verbose: Print debug information
+
+ Returns:
+ pd.DataFrame: Loaded data with MultiIndex (datetime, instrument)
+
+ Notes:
+ - The buffer_days is needed because the Diff processor calculates
+ period-over-period changes, which requires looking back in time.
+ - After loading, you should filter the result to [since_date, end_date]
+ to remove the buffer period data.
+ """
+ # Resolve yaml path
+ if yaml_path is None:
+ yaml_path = HANDLER_YAML_PATH
+
+ yaml_path = Path(yaml_path)
+ if not yaml_path.exists():
+ raise FileNotFoundError(f"handler.yaml not found at {yaml_path}")
+
+ # Convert since_date to datetime if string
+ if isinstance(since_date, str):
+ since_date = pd.to_datetime(since_date)
+ if isinstance(end_date, str):
+ end_date = pd.to_datetime(end_date)
+
+ # Calculate load start (with buffer for diff calculations)
+ load_start = since_date - pd.Timedelta(days=buffer_days)
+
+ if verbose:
+ print("=" * 60)
+ print("Loading data from handler.yaml")
+ print("=" * 60)
+ print(f" Requested range: {since_date.date()} to {end_date.date()}")
+ print(f" Buffer days: {buffer_days}")
+ print(f" Actual load range: {load_start.date()} to {end_date.date()}")
+ print(f" Handler yaml: {yaml_path}")
+
+ # Load yaml config
+ yaml_loader = YAML(typ='safe', pure=True)
+ with open(yaml_path) as f:
+ config = yaml_loader.load(f)
+
+ # Initialize qlib
+ from qlib.workflow.cli import sys_config
+ config_path = "qlib.contrib.data.config"
+ sys_config(config, config_path)
+
+ qlib.init(**config.get("qlib_init"))
+
+ # Prepare placeholder values
+ placeholder_value = {
+ "": load_start,
+ "": end_date,
+ }
+
+ # Also load placeholders from handler module if available
+ try:
+ placeholder_value.update(
+ load_placehorder_from_module(config["handler"])
+ )
+ except Exception as e:
+ if verbose:
+ print(f" Note: Could not load placeholders from handler module: {e}")
+
+ # Fill placeholders in config
+ config = fill_placeholder(config, placeholder_value)
+
+ if verbose:
+ print("\nHandler config after filling placeholders:")
+ pprint.pprint(config)
+
+ # Initialize handler and load data
+ handler = init_instance_by_config(config["handler"])
+
+ # Return the underlying data
+ data = handler._data
+
+ if verbose:
+ # SepDataFrame doesn't have .shape, convert to DataFrame first
+ if hasattr(data, 'to_frame'):
+ data_df = data.to_frame() # Convert SepDataFrame to DataFrame
+ else:
+ data_df = data
+ print(f"\nLoaded data shape: {data_df.shape}")
+ print(f"Data index levels: {data_df.index.names}")
+ print(f"Data columns: {list(data_df.columns)[:20]}...")
+
+ # Filter to requested date range
+ print(f"\nFiltering to requested range: {since_date.date()} to {end_date.date()}")
+
+ # Filter to the requested date range (remove buffer period)
+ if isinstance(data.index, pd.MultiIndex):
+ data = data.loc(axis=0)[slice(since_date, end_date), :]
+ else:
+ data = data.loc[slice(since_date, end_date)]
+
+ if verbose:
+ # Again handle SepDataFrame
+ if hasattr(data, 'to_frame'):
+ data_df = data.to_frame()
+ else:
+ data_df = data
+ print(f"Filtered data shape: {data_df.shape}")
+ print("=" * 60)
+
+ return data
+
+
+def load_data_with_proc_list(
+ since_date: str,
+ end_date: str,
+ proc_list_path: Optional[str] = None,
+ buffer_days: int = 20,
+ yaml_path: Optional[str] = None,
+ verbose: bool = True
+) -> pd.DataFrame:
+ """
+ Load data and apply the preprocessing pipeline (proc_list).
+
+ This is the full gold-standard pipeline that produces the exact features
+ the VAE was trained on.
+
+ Args:
+ since_date: Start date for the data (YYYY-MM-DD)
+ end_date: End date for the data (YYYY-MM-DD)
+ proc_list_path: Path to proc_list.proc file
+ buffer_days: Extra days to load before since_date
+ yaml_path: Path to handler.yaml
+ verbose: Print debug information
+
+ Returns:
+ pd.DataFrame: Preprocessed data
+ """
+ import pickle as pkl
+ from qlib.contrib.data.utils import apply_proc_list
+
+ # Default proc_list path
+ if proc_list_path is None:
+ proc_list_path = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/proc_list.proc"
+
+ if verbose:
+ print("Step 1: Loading raw data from handler...")
+
+ # Load raw data
+ df = load_data_from_handler(
+ since_date=since_date,
+ end_date=end_date,
+ buffer_days=buffer_days,
+ yaml_path=yaml_path,
+ verbose=verbose
+ )
+
+ if verbose:
+ print("\nStep 2: Loading preprocessing pipeline (proc_list)...")
+ print(f" Path: {proc_list_path}")
+
+ # Load proc_list
+ with open(proc_list_path, "rb") as f:
+ proc_list = pkl.load(f)
+
+ if verbose:
+ print(f" Number of processors: {len(proc_list)}")
+ for i, proc in enumerate(proc_list):
+ print(f" [{i}] {type(proc).__name__}")
+
+ if verbose:
+ print("\nStep 3: Applying preprocessing pipeline...")
+
+ # Apply proc_list
+ # Note: with_fit=False because we use pre-fitted parameters
+ df_processed = apply_proc_list(df, proc_list=proc_list, with_fit=False)
+
+ if verbose:
+ print(f"\nProcessed data shape: {df_processed.shape}")
+ print("=" * 60)
+
+ return df_processed
+
+
+def _fill_con_rating_nan(raw_data, verbose=True):
+ """
+ Fill NaN values in con_rating_strength column before applying proc_list.
+
+ The Diff processor creates NaN values, and FlagMarketInjector fails when
+ trying to convert columns with NaN to int8. This function fills NaN in
+ con_rating_strength with the column median to avoid IntCastingNaNError.
+
+ Args:
+ raw_data: SepDataFrame or DataFrame with MultiIndex columns
+ verbose: Print debug info
+
+ Returns:
+ Same type as input, with NaN filled in con_rating_strength
+ """
+ # Check if this is a SepDataFrame (qlib's separated DataFrame)
+ is_sep = hasattr(raw_data, 'to_frame') and type(raw_data).__name__ == 'SepDataFrame'
+
+ # Convert SepDataFrame to DataFrame if needed
+ if is_sep:
+ df = raw_data.to_frame()
+ else:
+ df = raw_data
+
+ # Check if con_rating_strength exists in feature_ext group
+ target_col = ('feature_ext', 'con_rating_strength')
+ if target_col in df.columns:
+ median_val = df[target_col].median()
+ nan_count = df[target_col].isna().sum()
+ if verbose:
+ print(f" Filling {nan_count} NaN values in con_rating_strength with median={median_val:.4f}")
+
+ # Create a copy and fill NaN
+ df = df.copy()
+ df[target_col] = df[target_col].fillna(median_val)
+
+ if verbose:
+ print(f" Verified: {df[target_col].isna().sum()} NaN remaining")
+
+ return df
+
+ if verbose:
+ print(" con_rating_strength not found, skipping NaN fill")
+
+ return raw_data
+
+
+class FixedDiff:
+ """
+ Fixed Diff processor that correctly handles :: separator column format.
+
+ The original qlib Diff processor has a bug where it creates column names like:
+ "('feature_ext', 'log_size')_diff" (string representation of tuple)
+ Instead of:
+ 'feature_ext::log_size_diff' (proper :: separator format)
+ """
+
+ def __init__(self, fields_group, suffix="diff", periods=1):
+ self.fields_group = fields_group
+ self.suffix = suffix
+ self.periods = periods
+
+ def __call__(self, df):
+ import pandas as pd
+
+ # Get columns for this group - handle :: separator format
+ cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
+ df_cols = df[cols]
+ cols_name = df_cols.columns
+
+ # Apply diff transformation
+ df_cols_diff = df_cols.groupby("instrument").transform(
+ lambda x: x.ffill().diff(self.periods).fillna(0.)
+ )
+ df_cols = pd.concat([df_cols, df_cols_diff], axis=1)
+
+ # Create new column names with suffix appended
+ new_cols = []
+ for name in cols_name:
+ new_cols.append(name)
+ new_cols.append(f"{name}_{self.suffix}")
+
+ df_cols.columns = new_cols
+ df[df_cols.columns] = df_cols
+ return df
+
+
+class FixedColumnRemover:
+ """Fixed ColumnRemover that handles :: separator format."""
+
+ def __init__(self, fields_group):
+ self.fields_group = fields_group
+
+ def __call__(self, df):
+ cols_to_remove = []
+ for item in self.fields_group:
+ if item in df.columns:
+ cols_to_remove.append(item)
+ return df.drop(columns=cols_to_remove, errors='ignore')
+
+
+class FixedFlagToOnehot:
+ """Fixed FlagToOnehot that handles :: separator format."""
+
+ def __init__(self, fields_group, onehot_group, format_compact=False):
+ self.fields_group = fields_group
+ self.onehot_group = onehot_group
+ self.format_compact = format_compact
+
+ def __call__(self, df):
+ import pandas as pd
+ cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
+ for col in cols:
+ industry_code = col.split('::')[1]
+ new_col = f"{self.onehot_group}::{industry_code}"
+ df[new_col] = df[col].astype(int)
+ return df
+
+
+class FixedIndusNtrlInjector:
+ """Fixed IndusNtrlInjector that handles :: separator format."""
+
+ def __init__(self, fields_group, input_group, indus_group,
+ indus_suffix="_ntrl", ntrl_suffix="_ntrl", keep_origin=True,
+ include_indus=False, include_indus_std=False, norm_by_ntrl=False):
+ self.fields_group = fields_group
+ self.input_group = input_group
+ self.indus_group = indus_group
+ self.indus_suffix = indus_suffix
+ self.ntrl_suffix = ntrl_suffix
+ self.keep_origin = keep_origin
+ self.include_indus = include_indus
+ self.include_indus_std = include_indus_std
+ self.norm_by_ntrl = norm_by_ntrl
+
+ def __call__(self, df):
+ import pandas as pd
+ import numpy as np
+
+ feature_cols = [c for c in df.columns if c.startswith(f"{self.input_group}::")]
+ indus_cols = [c for c in df.columns if c.startswith(f"{self.indus_group}::")]
+
+ # Get primary industry column (first one with any True values)
+ indus_assign = None
+ for ic in indus_cols:
+ if df[ic].any():
+ indus_assign = ic
+ break
+
+ if indus_assign is None:
+ return df
+
+ for feat_col in feature_cols:
+ feat_name = feat_col.split('::')[1]
+ grouped = df.groupby(indus_assign)[feat_col]
+ indus_mean = grouped.transform('mean')
+ indus_std = grouped.transform('std')
+ ntrl_col = f"{self.input_group}::{feat_name}{self.ntrl_suffix}"
+ df[ntrl_col] = (df[feat_col] - indus_mean) / indus_std.replace(0, np.nan)
+
+ return df
+
+
+class FixedRobustZScoreNorm:
+ """Fixed RobustZScoreNorm that handles :: separator format with trained params."""
+
+ def __init__(self, fields_group, mean_train, std_train, clip_outlier=True, cols=None):
+ self.fields_group = fields_group
+ self.mean_train = mean_train
+ self.std_train = std_train
+ self.clip_outlier = clip_outlier
+ self.cols = cols
+
+ def __call__(self, df):
+ import pandas as pd
+ import numpy as np
+
+ # Get columns to normalize
+ if isinstance(self.fields_group, list):
+ cols_to_norm = []
+ for grp in self.fields_group:
+ cols_to_norm.extend([c for c in df.columns if c.startswith(f"{grp}::")])
+ else:
+ cols_to_norm = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
+
+ # Apply normalization using trained mean/std
+ if self.mean_train is not None and self.std_train is not None:
+ for i, col in enumerate(cols_to_norm):
+ if i < len(self.mean_train) and i < len(self.std_train):
+ mean_val = self.mean_train[i]
+ std_val = self.std_train[i]
+ if std_val > 0:
+ df[col] = (df[col] - mean_val) / std_val
+ return df
+
+
+class FixedFillna:
+ """Fixed Fillna that handles :: separator format."""
+
+ def __init__(self, fields_group, fill_value=0):
+ self.fields_group = fields_group
+ self.fill_value = fill_value
+
+ def __call__(self, df):
+ if isinstance(self.fields_group, list):
+ cols_to_fill = []
+ for grp in self.fields_group:
+ cols_to_fill.extend([c for c in df.columns if c.startswith(f"{grp}::")])
+ else:
+ cols_to_fill = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
+ df[cols_to_fill] = df[cols_to_fill].fillna(self.fill_value)
+ return df
+
+
+class FixedFlagMarketInjector:
+ """Fixed FlagMarketInjector that handles :: separator format."""
+
+ def __init__(self, fields_group, vocab_size=2):
+ self.fields_group = fields_group
+ self.vocab_size = vocab_size
+
+ def __call__(self, df):
+ cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
+ for col in cols:
+ df[col] = df[col].astype('int8')
+ return df
+
+
+class FixedFlagSTInjector:
+ """Fixed FlagSTInjector that handles :: separator format."""
+
+ def __init__(self, fields_group, st_group="st_flag", col_name="IsST"):
+ self.fields_group = fields_group
+ self.st_group = st_group
+ self.col_name = col_name
+
+ def __call__(self, df):
+ cols = [c for c in df.columns if c.startswith(f"{self.st_group}::")]
+ for col in cols:
+ df[col] = df[col].astype('int8')
+ return df
+
+
+def convert_columns_to_double_colon(df):
+ """
+ Convert MultiIndex tuple columns to '::' separator string format.
+
+ This is needed because the proc_list was trained on data with column names like:
+ 'feature_ext::log_size'
+ But our loader produces MultiIndex tuples:
+ ('feature_ext', 'log_size')
+
+ Args:
+ df: DataFrame with MultiIndex tuple columns
+
+ Returns:
+ DataFrame with string columns using '::' separator
+ """
+ if not isinstance(df.columns, pd.MultiIndex):
+ return df
+
+ # Create new column names with :: separator
+ new_columns = [f"{grp}::{col}" for grp, col in df.columns]
+ df_copy = df.copy()
+ df_copy.columns = new_columns
+ return df_copy
+
+
+def convert_columns_from_double_colon(df):
+ """
+ Convert '::' separator string columns back to MultiIndex tuple format.
+
+ Args:
+ df: DataFrame with '::' separator string columns
+
+ Returns:
+ DataFrame with MultiIndex tuple columns
+ """
+ # Check if columns are strings with :: separator
+ if not isinstance(df.columns, pd.Index):
+ return df
+
+ # Check if any column contains ::
+ has_double_colon = any(isinstance(c, str) and '::' in c for c in df.columns)
+ if not has_double_colon:
+ return df
+
+ # Convert to MultiIndex
+ new_columns = [tuple(c.split('::', 1)) for c in df.columns]
+ df_copy = df.copy()
+ df_copy.columns = pd.MultiIndex.from_tuples(new_columns)
+ return df_copy
+
+
+def load_and_dump_data(
+ since_date: str,
+ end_date: str,
+ output_dir: Optional[str] = None,
+ proc_list_path: Optional[str] = None,
+ buffer_days: int = 20,
+ yaml_path: Optional[str] = None,
+ verbose: bool = True,
+ skip_proc_list: bool = False,
+ fill_con_rating_nan: bool = True # New parameter
+) -> tuple:
+ """
+ Load data and dump both raw (before proc_list) and processed (after proc_list) versions.
+
+ This function saves:
+ 1. Raw data from handler (before applying preprocessing pipeline)
+ 2. Processed data (after applying proc_list) - if not skipped and if successful
+
+ Note: The proc_list may fail due to compatibility issues with the data. In this case,
+ only the raw data will be saved.
+
+ Args:
+ since_date: Start date for the data (YYYY-MM-DD)
+ end_date: End date for the data (YYYY-MM-DD)
+ output_dir: Output directory for pickle files (default: data/ folder)
+ proc_list_path: Path to proc_list.proc file
+ buffer_days: Extra days to load before since_date
+ yaml_path: Path to handler.yaml
+ verbose: Print debug information
+ skip_proc_list: If True, skip applying proc_list entirely
+ fill_con_rating_nan: If True, fill NaN in con_rating_strength before proc_list
+
+ Returns:
+ tuple: (raw_df, processed_df or None)
+ """
+ import pickle as pkl
+ from qlib.contrib.data.utils import apply_proc_list
+
+ # Default output directory (data/ folder in project)
+ if output_dir is None:
+ output_dir = PROJECT_DIR / "data"
+ else:
+ output_dir = Path(output_dir)
+
+ # Default proc_list path
+ if proc_list_path is None:
+ proc_list_path = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/proc_list.proc"
+
+ if verbose:
+ print("=" * 60)
+ print("Loading and dumping data")
+ print("=" * 60)
+ print(f" Output directory: {output_dir}")
+ print()
+
+ # Ensure output directory exists
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Convert dates
+ if isinstance(since_date, str):
+ since_date = pd.to_datetime(since_date)
+ if isinstance(end_date, str):
+ end_date = pd.to_datetime(end_date)
+
+ # Step 1: Load raw data from handler (with buffer period, NOT filtered)
+ if verbose:
+ print("Step 1: Loading raw data from handler (with buffer period)...")
+ print(f" Requested range: {since_date.date()} to {end_date.date()}")
+ print(f" Buffer days: {buffer_days}")
+
+ # Load yaml config
+ yaml_path = yaml_path or HANDLER_YAML_PATH
+ yaml_path = Path(yaml_path)
+ if not yaml_path.exists():
+ raise FileNotFoundError(f"handler.yaml not found at {yaml_path}")
+
+ yaml_loader = YAML(typ='safe', pure=True)
+ with open(yaml_path) as f:
+ config = yaml_loader.load(f)
+
+ # Initialize qlib
+ from qlib.workflow.cli import sys_config
+ config_path = "qlib.contrib.data.config"
+ sys_config(config, config_path)
+ qlib.init(**config.get("qlib_init"))
+
+ # Calculate load start (with buffer for diff calculations)
+ load_start = since_date - pd.Timedelta(days=buffer_days)
+
+ # Prepare placeholder values
+ placeholder_value = {
+ "": load_start,
+ "": end_date,
+ }
+
+ # Load placeholders from handler module
+ try:
+ placeholder_value.update(
+ load_placehorder_from_module(config["handler"])
+ )
+ except Exception as e:
+ if verbose:
+ print(f" Note: Could not load placeholders from handler module: {e}")
+
+ # Fill placeholders in config
+ config = fill_placeholder(config, placeholder_value)
+
+ # Initialize handler and load data
+ handler = init_instance_by_config(config["handler"])
+ raw_data = handler._data # Keep as SepDataFrame
+
+ if verbose:
+ if hasattr(raw_data, 'to_frame'):
+ tmp_df = raw_data.to_frame()
+ else:
+ tmp_df = raw_data
+ print(f" Loaded data shape (with buffer): {tmp_df.shape}")
+ print(f" Data index levels: {tmp_df.index.names}")
+
+ # Step 2: Dump raw data (before proc_list, filtered to requested range)
+ # Filter first for dumping
+ if isinstance(raw_data.index, pd.MultiIndex):
+ raw_data_filtered = raw_data.loc(axis=0)[slice(since_date, end_date), :]
+ else:
+ raw_data_filtered = raw_data.loc[slice(since_date, end_date)]
+
+ if hasattr(raw_data_filtered, 'to_frame'):
+ raw_df = raw_data_filtered.to_frame()
+ else:
+ raw_df = raw_data_filtered
+
+ raw_output_path = output_dir / f"raw_data_{since_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.pkl"
+ if verbose:
+ print(f"\nStep 2: Dumping raw data (filtered) to {raw_output_path}...")
+ print(f" Raw data shape (filtered): {raw_df.shape}")
+
+ with open(raw_output_path, "wb") as f:
+ pkl.dump(raw_df, f)
+
+ if verbose:
+ print(f" Saved: {raw_output_path}")
+
+ # Skip proc_list if requested
+ if skip_proc_list:
+ if verbose:
+ print("\nStep 3: Skipping proc_list as requested")
+ print()
+ print("=" * 60)
+ print("Summary:")
+ print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
+ print(f" Processed data: SKIPPED")
+ print("=" * 60)
+ return raw_df, None
+
+ # Step 3: Load preprocessing pipeline (proc_list)
+ if verbose:
+ print("\nStep 3: Loading preprocessing pipeline (proc_list)...")
+ print(f" Path: {proc_list_path}")
+
+ with open(proc_list_path, "rb") as f:
+ proc_list = pkl.load(f)
+
+ if verbose:
+ print(f" Number of processors: {len(proc_list)}")
+ for i, proc in enumerate(proc_list):
+ print(f" [{i}] {type(proc).__name__}")
+
+ # Step 4: Apply proc_list (BEFORE filtering, on data with buffer period)
+ if verbose:
+ print("\nStep 4: Applying preprocessing pipeline (on data with buffer)...")
+
+ # Convert SepDataFrame to DataFrame for processing
+ if hasattr(raw_data, 'to_frame'):
+ df_for_proc = raw_data.to_frame()
+ else:
+ df_for_proc = raw_data.copy()
+
+ # Fill NaN in con_rating_strength if requested (workaround for IntCastingNaNError)
+ if fill_con_rating_nan:
+ if verbose:
+ print(" Pre-processing: Filling NaN in con_rating_strength...")
+ df_for_proc = _fill_con_rating_nan(df_for_proc, verbose=verbose)
+
+ # Convert columns from MultiIndex tuples to :: separator format
+ # The proc_list was trained on data with 'feature_ext::log_size' format
+ df_for_proc = convert_columns_to_double_colon(df_for_proc)
+
+ if verbose:
+ print(f" Converted columns: {list(df_for_proc.columns[:5])}...")
+
+ # Replace Diff processor with FixedDiff - keep all other processors from pickle
+ # FixedDiff handles the :: separator format and fixes the column naming bug
+ fixed_proc_list = []
+ for proc in proc_list:
+ proc_name = type(proc).__name__
+ if proc_name == 'Diff':
+ if verbose:
+ print(f" Replacing Diff with FixedDiff (fields_group={proc.fields_group})")
+ fixed_proc = FixedDiff(
+ fields_group=proc.fields_group,
+ suffix=proc.suffix,
+ periods=proc.periods
+ )
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'FlagMarketInjector':
+ if verbose:
+ print(f" Replacing FlagMarketInjector with FixedFlagMarketInjector")
+ fixed_proc = FixedFlagMarketInjector(
+ fields_group=proc.fields_group,
+ vocab_size=getattr(proc, 'vocab_size', 2)
+ )
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'FlagSTInjector':
+ if verbose:
+ print(f" Replacing FlagSTInjector with FixedFlagSTInjector")
+ fixed_proc = FixedFlagSTInjector(
+ fields_group=proc.fields_group,
+ st_group=getattr(proc, 'st_group', 'st_flag'),
+ col_name=getattr(proc, 'col_name', 'IsST')
+ )
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'ColumnRemover':
+ if verbose:
+ print(f" Replacing ColumnRemover with FixedColumnRemover")
+ fixed_proc = FixedColumnRemover(fields_group=proc.fields_group)
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'FlagToOnehot':
+ if verbose:
+ print(f" Replacing FlagToOnehot with FixedFlagToOnehot")
+ fixed_proc = FixedFlagToOnehot(
+ fields_group=proc.fields_group,
+ onehot_group=getattr(proc, 'onehot_group', 'industry')
+ )
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'IndusNtrlInjector':
+ if verbose:
+ print(f" Replacing IndusNtrlInjector with FixedIndusNtrlInjector (fields_group={proc.fields_group})")
+ fixed_proc = FixedIndusNtrlInjector(
+ fields_group=proc.fields_group,
+ input_group=getattr(proc, 'input_group', proc.fields_group),
+ indus_group=getattr(proc, 'indus_group', 'indus_flag'),
+ ntrl_suffix=getattr(proc, 'ntrl_suffix', '_ntrl')
+ )
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'RobustZScoreNorm':
+ if verbose:
+ print(f" Replacing RobustZScoreNorm with FixedRobustZScoreNorm (using trained mean/std)")
+ fixed_proc = FixedRobustZScoreNorm(
+ fields_group=proc.fields_group,
+ mean_train=getattr(proc, 'mean_train', None),
+ std_train=getattr(proc, 'std_train', None),
+ clip_outlier=getattr(proc, 'clip_outlier', True)
+ )
+ fixed_proc_list.append(fixed_proc)
+ elif proc_name == 'Fillna':
+ if verbose:
+ print(f" Replacing Fillna with FixedFillna")
+ fixed_proc = FixedFillna(
+ fields_group=proc.fields_group,
+ fill_value=getattr(proc, 'fill_value', 0)
+ )
+ fixed_proc_list.append(fixed_proc)
+ else:
+ # Keep original processor for unknown types
+ fixed_proc_list.append(proc)
+
+ try:
+ # Apply the fixed proc_list
+ if verbose:
+ print("\n Applying fixed preprocessing pipeline...")
+ processed_data = apply_proc_list(df_for_proc, proc_list=fixed_proc_list, with_fit=False)
+
+ if verbose:
+ print(f" Processed data shape: {processed_data.shape}")
+ print(f" Processed columns sample: {list(processed_data.columns[:5])}...")
+
+ # Convert columns back to MultiIndex tuples for consistency
+ processed_data = convert_columns_from_double_colon(processed_data)
+
+ # Now filter to requested date range
+ if isinstance(processed_data.index, pd.MultiIndex):
+ processed_data_filtered = processed_data.loc(axis=0)[slice(since_date, end_date), :]
+ else:
+ processed_data_filtered = processed_data.loc[slice(since_date, end_date)]
+
+ if verbose:
+ print(f" Processed data shape (filtered): {processed_data_filtered.shape}")
+
+ # Step 5: Dump processed data (after proc_list)
+ processed_output_path = output_dir / f"processed_data_{since_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.pkl"
+ if verbose:
+ print(f"\nStep 5: Dumping processed data to {processed_output_path}...")
+
+ with open(processed_output_path, "wb") as f:
+ pkl.dump(processed_data_filtered, f)
+
+ if verbose:
+ print(f" Saved: {processed_output_path}")
+ print()
+ print("=" * 60)
+ print("Summary:")
+ print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
+ print(f" Processed data: {processed_data_filtered.shape} -> {processed_output_path}")
+ print("=" * 60)
+
+ return raw_df, processed_data_filtered
+
+ except Exception as e:
+ if verbose:
+ print(f"\nERROR applying proc_list: {e}")
+ print("Only raw data was saved. The proc_list may have compatibility issues")
+ print("with this data (e.g., NaN values in columns that need int8 conversion).")
+ print()
+ print("=" * 60)
+ print("Summary:")
+ print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
+ print(f" Processed data: FAILED ({type(e).__name__})")
+ print("=" * 60)
+ return raw_df, None
+
+
+# Convenience function for quick testing
+if __name__ == "__main__":
+ # Test loading data
+ test_since = "2019-01-01"
+ test_end = "2019-01-31"
+
+ print(f"Testing data loader with date range: {test_since} to {test_end}")
+ print()
+
+ try:
+ df = load_data_from_handler(
+ since_date=test_since,
+ end_date=test_end,
+ buffer_days=20,
+ verbose=True
+ )
+ print(f"\nSuccess! Loaded {len(df)} rows")
+ print(f"Date range in data: {df.index.get_level_values('datetime').min()} to {df.index.get_level_values('datetime').max()}")
+ except Exception as e:
+ print(f"\nError: {e}")
+ import traceback
+ traceback.print_exc()