You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
8.7 KiB

#!/usr/bin/env python
"""
Script to generate and dump transformed features from the alpha158_beta pipeline.
This script provides fine-grained control over the feature generation and dumping process:
- Select which feature groups to dump (alpha158, market_ext, market_flag, merged, vae_input)
- Choose output format (parquet, pickle, numpy)
- Control date range and universe filtering
- Save intermediate pipeline outputs
- Enable streaming mode for large datasets (>1 year)
Usage:
# Dump all features to parquet
python dump_features.py --start-date 2025-01-01 --end-date 2025-01-31
# Dump only alpha158 features to pickle
python dump_features.py --groups alpha158 --format pickle
# Dump with custom output path
python dump_features.py --output /path/to/output.parquet
# Dump merged features with all columns
python dump_features.py --groups merged --verbose
# Use streaming mode for large date ranges (>1 year)
python dump_features.py --start-date 2020-01-01 --end-date 2023-12-31 --streaming
"""
import os
import sys
import argparse
from pathlib import Path
from typing import Optional, List
# Add src to path for imports
SCRIPT_DIR = Path(__file__).parent
sys.path.insert(0, str(SCRIPT_DIR.parent / 'src'))
from processors import (
FeaturePipeline,
FeatureGroups,
VAE_INPUT_DIM,
ALPHA158_COLS,
MARKET_EXT_BASE_COLS,
COLUMNS_TO_REMOVE,
get_groups,
dump_to_parquet,
dump_to_pickle,
dump_to_numpy,
)
# Default output directory
DEFAULT_OUTPUT_DIR = SCRIPT_DIR.parent / "data"
def generate_and_dump(
start_date: str,
end_date: str,
output_path: str,
output_format: str = 'parquet',
groups: List[str] = None,
universe: str = 'csiallx',
filter_universe: bool = True,
robust_zscore_params_path: Optional[str] = None,
verbose: bool = True,
pack_struct: bool = False,
streaming: bool = False,
) -> None:
"""
Generate features and dump to file.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
output_path: Output file path
output_format: Output format ('parquet', 'pickle', 'numpy')
groups: Feature groups to dump (default: ['merged'])
universe: Stock universe name
filter_universe: Whether to filter to stock universe
robust_zscore_params_path: Path to robust zscore parameters
verbose: Whether to print progress
pack_struct: If True, pack each feature group into struct columns
(features_alpha158, features_market_ext, features_market_flag)
streaming: If True, use Polars streaming mode for large datasets (>1 year)
"""
if groups is None:
groups = ['merged']
print("=" * 60)
print("Feature Dump Script")
print("=" * 60)
print(f"Date range: {start_date} to {end_date}")
print(f"Output format: {output_format}")
print(f"Feature groups: {groups}")
print(f"Universe: {universe} (filter: {filter_universe})")
print(f"Pack struct: {pack_struct}")
print(f"Output path: {output_path}")
print("=" * 60)
# Initialize pipeline
pipeline = FeaturePipeline(
robust_zscore_params_path=robust_zscore_params_path
)
# Load data
feature_groups = pipeline.load_data(
start_date, end_date,
filter_universe=filter_universe,
universe_name=universe,
streaming=streaming
)
# Apply transformations - get merged DataFrame (pipeline always returns merged DataFrame now)
df_transformed = pipeline.transform(feature_groups, pack_struct=pack_struct)
# Select feature groups from merged DataFrame
outputs = get_groups(df_transformed, groups, verbose, use_struct=False)
# Ensure output directory exists
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
# Dump to file(s)
if output_format == 'numpy':
# For numpy, we save the merged features
dump_to_numpy(feature_groups, output_path, include_metadata=True, verbose=verbose)
elif output_format == 'pickle':
if 'merged' in outputs:
dump_to_pickle(outputs['merged'], output_path, verbose=verbose)
elif len(outputs) == 1:
# Single group output
key = list(outputs.keys())[0]
base_path = Path(output_path)
dump_path = str(base_path.with_name(f"{base_path.stem}_{key}{base_path.suffix}"))
dump_to_pickle(outputs[key], dump_path, verbose=verbose)
else:
# Multiple groups - save each separately
base_path = Path(output_path)
for key, df_out in outputs.items():
dump_path = str(base_path.with_name(f"{base_path.stem}_{key}{base_path.suffix}"))
dump_to_pickle(df_out, dump_path, verbose=verbose)
else: # parquet
if 'merged' in outputs:
dump_to_parquet(outputs['merged'], output_path, verbose=verbose)
elif len(outputs) == 1:
# Single group output
key = list(outputs.keys())[0]
base_path = Path(output_path)
dump_path = str(base_path.with_name(f"{base_path.stem}_{key}{base_path.suffix}"))
dump_to_parquet(outputs[key], dump_path, verbose=verbose)
else:
# Multiple groups - save each separately
base_path = Path(output_path)
for key, df_out in outputs.items():
dump_path = str(base_path.with_name(f"{base_path.stem}_{key}{base_path.suffix}"))
dump_to_parquet(df_out, dump_path, verbose=verbose)
print("=" * 60)
print("Feature dump complete!")
print("=" * 60)
def main():
parser = argparse.ArgumentParser(
description="Generate and dump transformed features from alpha158_beta pipeline"
)
# Date range
parser.add_argument(
"--start-date", type=str, required=True,
help="Start date in YYYY-MM-DD format"
)
parser.add_argument(
"--end-date", type=str, required=True,
help="End date in YYYY-MM-DD format"
)
# Output settings
parser.add_argument(
"--output", "-o", type=str, default=None,
help=f"Output file path (default: {DEFAULT_OUTPUT_DIR}/features.parquet)"
)
parser.add_argument(
"--format", "-f", type=str, default='parquet',
choices=['parquet', 'pickle', 'numpy'],
help="Output format (default: parquet)"
)
# Feature groups
parser.add_argument(
"--groups", "-g", type=str, nargs='+', default=['merged'],
choices=['merged', 'alpha158', 'market_ext', 'market_flag', 'vae_input'],
help="Feature groups to dump (default: merged)"
)
# Universe settings
parser.add_argument(
"--universe", type=str, default='csiallx',
help="Stock universe name (default: csiallx)"
)
parser.add_argument(
"--no-filter-universe", action="store_true",
help="Disable stock universe filtering"
)
# Robust zscore parameters
parser.add_argument(
"--robust-zscore-params", type=str, default=None,
help="Path to robust zscore parameters directory"
)
# Verbose mode
parser.add_argument(
"--verbose", "-v", action="store_true", default=True,
help="Enable verbose output (default: True)"
)
parser.add_argument(
"--quiet", "-q", action="store_true",
help="Disable verbose output"
)
# Struct option
parser.add_argument(
"--pack-struct", "-s", action="store_true",
help="Pack each feature group into separate struct columns (features_alpha158, features_market_ext, features_market_flag)"
)
# Streaming option
parser.add_argument(
"--streaming", action="store_true",
help="Use Polars streaming mode for large datasets (recommended for date ranges > 1 year)"
)
args = parser.parse_args()
# Handle verbose/quiet flags
verbose = args.verbose and not args.quiet
# Set default output path
if args.output is None:
# Build default output path: {data_dir}/features_{group}.parquet
# Note: generate_and_dump will add group suffix, so use base name "features"
output_path = str(DEFAULT_OUTPUT_DIR / "features.parquet")
else:
output_path = args.output
# Generate and dump
generate_and_dump(
start_date=args.start_date,
end_date=args.end_date,
output_path=output_path,
output_format=args.format,
groups=args.groups,
universe=args.universe,
filter_universe=not args.no_filter_universe,
robust_zscore_params_path=args.robust_zscore_params,
verbose=verbose,
pack_struct=args.pack_struct,
streaming=args.streaming,
)
if __name__ == "__main__":
main()